Support snapshot creation and recovery in distributed

Summary:
Add custom encoder/decoder

Update snapshot recovery

Reviewers: florijan, teon.banek, mferencevic, mculinovic

Reviewed By: florijan

Subscribers: mferencevic, pullbot

Differential Revision: https://phabricator.memgraph.io/D1142
This commit is contained in:
Dominik Gleich 2018-01-29 19:16:06 +01:00
parent d41ffb5039
commit c37bb87ed8
17 changed files with 456 additions and 96 deletions

View File

@ -19,7 +19,7 @@ class DecodedValue;
* The decoder writes data into this structure.
*/
struct DecodedVertex {
gid::Gid id;
int64_t id;
std::vector<std::string> labels;
std::map<std::string, DecodedValue> properties;
};
@ -29,7 +29,7 @@ struct DecodedVertex {
* The decoder writes data into this structure.
*/
struct DecodedEdge {
gid::Gid id;
int64_t id;
int64_t from;
int64_t to;
std::string type;
@ -41,7 +41,7 @@ struct DecodedEdge {
* The decoder writes data into this structure.
*/
struct DecodedUnboundedEdge {
gid::Gid id;
int64_t id;
std::string type;
std::map<std::string, DecodedValue> properties;
};

View File

@ -425,6 +425,25 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
edge_type);
}
EdgeAccessor GraphDbAccessor::InsertOnlyEdge(Edges::VertexAddress &from,
Edges::VertexAddress &to,
storage::EdgeType edge_type,
gid::Gid edge_gid) {
auto gid = db_.storage().edge_generator_.Next(edge_gid);
DCHECK(gid == edge_gid) << "Gid should be equal as edge gid since "
"this edges are only added after vertices "
"reference them by their gid";
auto edge_vlist =
new mvcc::VersionList<Edge>(transaction_, gid, from, to, edge_type);
// We need to insert edge_vlist to edges_ before calling update since update
// can throw and edge_vlist will not be garbage collected if it is not in
// edges_ skiplist.
bool success = db_.storage().edges_.access().insert(gid, edge_vlist).second;
CHECK(success) << "Attempting to insert an edge with an existing GID: "
<< gid;
return EdgeAccessor(edge_vlist, *this, from, to, edge_type);
}
int64_t GraphDbAccessor::EdgesCount() const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.storage().edges_.access().size();

View File

@ -278,6 +278,14 @@ class GraphDbAccessor {
std::experimental::optional<gid::Gid> requested_gid =
std::experimental::nullopt);
/**
* Insert edge into main storage, but don't insert it into from and to
* vertices edge lists.
*/
EdgeAccessor InsertOnlyEdge(Edges::VertexAddress &from,
Edges::VertexAddress &to,
storage::EdgeType edge_type, gid::Gid edge_gid);
/**
* Removes an edge from the graph. Parameters can indicate if the edge should
* be removed from data structures in vertices it connects. When removing an
@ -551,6 +559,12 @@ class GraphDbAccessor {
template <typename TRecord>
distributed::RemoteCache<TRecord> &remote_elements();
/// Gets the local address for the given gid. Fails if not present.
mvcc::VersionList<Vertex> *LocalVertexAddress(gid::Gid gid) const;
/// Gets the local edge address for the given gid. Fails if not present.
mvcc::VersionList<Edge> *LocalEdgeAddress(gid::Gid gid) const;
private:
GraphDb &db_;
tx::Transaction &transaction_;
@ -607,11 +621,5 @@ class GraphDbAccessor {
void UpdatePropertyIndex(storage::Property property,
const RecordAccessor<Vertex> &vertex_accessor,
const Vertex *const vertex);
/// Gets the local address for the given gid. Fails if not present.
mvcc::VersionList<Vertex> *LocalVertexAddress(gid::Gid gid) const;
/// Gets the local edge address for the given gid. Fails if not present.
mvcc::VersionList<Edge> *LocalEdgeAddress(gid::Gid gid) const;
};
} // namespace database

View File

@ -17,7 +17,7 @@ namespace impl {
// global one, using the given worker_id.
template <typename TArchive, typename TAddress>
void SaveAddress(TArchive &ar, TAddress address, int worker_id) {
auto gid = address.is_remote() ? address.global_id() : address.local()->gid_;
auto gid = address.is_remote() ? address.gid() : address.local()->gid_;
ar << gid;
ar << worker_id;
};

View File

@ -3,10 +3,11 @@
#include <limits>
#include <unordered_map>
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/paths.hpp"
#include "durability/snapshot_decoded_value.hpp"
#include "durability/snapshot_decoder.hpp"
#include "durability/version.hpp"
#include "durability/wal.hpp"
#include "query/typed_value.hpp"
@ -55,10 +56,9 @@ struct RecoveryData {
bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
RecoveryData &recovery_data) {
HashedFileReader reader;
communication::bolt::Decoder<HashedFileReader> decoder(reader);
SnapshotDecoder<HashedFileReader> decoder(reader);
RETURN_IF_NOT(reader.Open(snapshot_file));
std::unordered_map<uint64_t, VertexAccessor> vertices;
auto magic_number = durability::kMagicNumber;
reader.Read(magic_number.data(), magic_number.size());
@ -112,29 +112,75 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
}
database::GraphDbAccessor dba(db);
std::unordered_map<gid::Gid,
std::pair<Edges::VertexAddress, Edges::VertexAddress>>
edge_gid_endpoints_mapping;
for (int64_t i = 0; i < vertex_count; ++i) {
DecodedValue vertex_dv;
RETURN_IF_NOT(decoder.ReadValue(&vertex_dv, DecodedValue::Type::Vertex));
auto &vertex = vertex_dv.ValueVertex();
auto vertex_accessor = dba.InsertVertex(vertex.id);
for (const auto &label : vertex.labels) {
auto vertex = decoder.ReadSnapshotVertex();
RETURN_IF_NOT(vertex);
auto vertex_accessor = dba.InsertVertex(vertex->gid);
for (const auto &label : vertex->labels) {
vertex_accessor.add_label(dba.Label(label));
}
for (const auto &property_pair : vertex.properties) {
for (const auto &property_pair : vertex->properties) {
vertex_accessor.PropsSet(dba.Property(property_pair.first),
query::TypedValue(property_pair.second));
}
vertices.insert({vertex.id, vertex_accessor});
auto vertex_record = vertex_accessor.GetNew();
for (const auto &edge : vertex->in) {
vertex_record->in_.emplace(edge.vertex, edge.address,
dba.EdgeType(edge.type));
edge_gid_endpoints_mapping[edge.address.gid()] = {
edge.vertex, vertex_accessor.GlobalAddress()};
}
for (const auto &edge : vertex->out) {
vertex_record->out_.emplace(edge.vertex, edge.address,
dba.EdgeType(edge.type));
edge_gid_endpoints_mapping[edge.address.gid()] = {
vertex_accessor.GlobalAddress(), edge.vertex};
}
}
auto vertex_transform_to_local_if_possible =
[&db, &dba](Edges::VertexAddress &address) {
if (address.is_local()) return;
// If the worker id matches it should be a local apperance
if (address.worker_id() == db.WorkerId()) {
address = Edges::VertexAddress(dba.LocalVertexAddress(address.gid()));
CHECK(address.is_local()) << "Address should be local but isn't";
}
};
auto edge_transform_to_local_if_possible =
[&db, &dba](Edges::EdgeAddress &address) {
if (address.is_local()) return;
// If the worker id matches it should be a local apperance
if (address.worker_id() == db.WorkerId()) {
address = Edges::EdgeAddress(dba.LocalEdgeAddress(address.gid()));
CHECK(address.is_local()) << "Address should be local but isn't";
}
};
for (int64_t i = 0; i < edge_count; ++i) {
DecodedValue edge_dv;
RETURN_IF_NOT(decoder.ReadValue(&edge_dv, DecodedValue::Type::Edge));
auto &edge = edge_dv.ValueEdge();
auto it_from = vertices.find(edge.from);
auto it_to = vertices.find(edge.to);
RETURN_IF_NOT(it_from != vertices.end() && it_to != vertices.end());
auto edge_accessor = dba.InsertEdge(it_from->second, it_to->second,
dba.EdgeType(edge.type), edge.id);
RETURN_IF_NOT(
decoder.ReadValue(&dv, communication::bolt::DecodedValue::Type::Edge));
auto &edge = dv.ValueEdge();
// We have to take full edge endpoints from vertices since the endpoints
// found here don't containt worker_id, and this can't be changed since this
// edges must be bolt-compliant
auto &edge_endpoints = edge_gid_endpoints_mapping[edge.id];
Edges::VertexAddress from;
Edges::VertexAddress to;
std::tie(from, to) = edge_endpoints;
// From and to are written in the global_address format and we should
// convert them back to local format for speedup - if possible
vertex_transform_to_local_if_possible(from);
vertex_transform_to_local_if_possible(to);
auto edge_accessor =
dba.InsertOnlyEdge(from, to, dba.EdgeType(edge.type), edge.id);
for (const auto &property_pair : edge.properties)
edge_accessor.PropsSet(dba.Property(property_pair.first),
@ -149,6 +195,33 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
dba.Abort();
return false;
}
// We have to replace global_ids with local ids where possible for all edges
// in every vertex and this can only be done after we inserted the edges; this
// is to speedup execution
for (auto &vertex_accessor : dba.Vertices(true)) {
auto vertex = vertex_accessor.GetNew();
auto iterate_and_transform =
[vertex_transform_to_local_if_possible,
edge_transform_to_local_if_possible](Edges &edges) {
Edges transformed;
for (auto &element : edges) {
auto vertex = element.vertex;
vertex_transform_to_local_if_possible(vertex);
auto edge = element.edge;
edge_transform_to_local_if_possible(edge);
transformed.emplace(vertex, edge, element.edge_type);
}
return transformed;
};
vertex->in_ = iterate_and_transform(vertex->in_);
vertex->out_ = iterate_and_transform(vertex->out_);
}
dba.Commit();
return true;
}

View File

@ -4,10 +4,10 @@
#include "durability/snapshooter.hpp"
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/paths.hpp"
#include "durability/snapshot_encoder.hpp"
#include "durability/version.hpp"
#include "utils/datetime/timestamp.hpp"
@ -20,7 +20,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
database::GraphDbAccessor &dba) {
try {
HashedFileWriter buffer(snapshot_file);
communication::bolt::BaseEncoder<HashedFileWriter> encoder(buffer);
SnapshotEncoder<HashedFileWriter> encoder(buffer);
int64_t vertex_num = 0, edge_num = 0;
encoder.WriteRAW(durability::kMagicNumber.data(),
@ -59,7 +59,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
}
for (const auto &vertex : dba.Vertices(false)) {
encoder.WriteVertex(vertex);
encoder.WriteSnapshotVertex(vertex);
vertex_num++;
}
for (const auto &edge : dba.Edges(false)) {

View File

@ -0,0 +1,43 @@
#pragma once
#include <map>
#include <string>
#include <vector>
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "query/typed_value.hpp"
#include "storage/property_value.hpp"
#include "utils/algorithm.hpp"
#include "utils/exceptions.hpp"
namespace durability {
/** Forward declartion of DecodedSnapshotEdge. */
struct DecodedInlinedVertexEdge;
/**
* Structure used when reading a Vertex with the decoder.
* The decoder writes data into this structure.
*/
struct DecodedSnapshotVertex {
gid::Gid gid;
std::vector<std::string> labels;
std::map<std::string, communication::bolt::DecodedValue> properties;
// Vector of edges without properties
std::vector<DecodedInlinedVertexEdge> in;
std::vector<DecodedInlinedVertexEdge> out;
};
/**
* Structure used when reading an Edge with the snapshot decoder.
* The decoder writes data into this structure.
*/
struct DecodedInlinedVertexEdge {
// Addresses down below must always be global_address and never direct
// pointers to a record.
Edges::EdgeAddress address;
Edges::VertexAddress vertex;
std::string type;
};
} // namespace durability

View File

@ -0,0 +1,92 @@
#pragma once
#include <experimental/optional>
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "durability/snapshot_decoded_value.hpp"
namespace durability {
using namespace communication::bolt;
template <typename Buffer>
class SnapshotDecoder : public Decoder<Buffer> {
public:
explicit SnapshotDecoder(Buffer &buffer) : Decoder<Buffer>(buffer) {}
std::experimental::optional<DecodedSnapshotVertex> ReadSnapshotVertex() {
DecodedValue dv;
DecodedSnapshotVertex vertex;
if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::Vertex)) {
DLOG(WARNING) << "Unable to read snapshot vertex";
return std::experimental::nullopt;
}
auto &read_vertex = dv.ValueVertex();
vertex.gid = read_vertex.id;
vertex.labels = read_vertex.labels;
vertex.properties = read_vertex.properties;
if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::Int)) {
DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of in "
"edges in vertex!";
return std::experimental::nullopt;
}
for (int i = 0; i < dv.ValueInt(); ++i) {
auto edge = ReadSnapshotEdge();
if (!edge) return std::experimental::nullopt;
vertex.in.emplace_back(*edge);
}
if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::Int)) {
DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of out "
"edges in vertex!";
return std::experimental::nullopt;
}
for (int i = 0; i < dv.ValueInt(); ++i) {
auto edge = ReadSnapshotEdge();
if (!edge) return std::experimental::nullopt;
vertex.out.emplace_back(*edge);
}
VLOG(1) << "[ReadSnapshotVertex] Success";
return vertex;
}
private:
std::experimental::optional<DecodedInlinedVertexEdge> ReadSnapshotEdge() {
DecodedValue dv;
DecodedInlinedVertexEdge edge;
VLOG(1) << "[ReadSnapshotEdge] Start";
// read ID
if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::Int)) {
DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read ID!";
return std::experimental::nullopt;
}
edge.address = dv.ValueInt();
// read other side
if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::Int)) {
DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read from address!";
return std::experimental::nullopt;
}
edge.vertex = dv.ValueInt();
// read type
if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::String)) {
DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read type!";
return std::experimental::nullopt;
}
edge.type = dv.ValueString();
VLOG(1) << "[ReadSnapshotEdge] Success";
return edge;
}
};
}; // namespace durability

View File

@ -0,0 +1,49 @@
#pragma once
#include "communication/bolt/v1/encoder/base_encoder.hpp"
namespace durability {
using namespace communication::bolt;
template <typename Buffer>
class SnapshotEncoder : public BaseEncoder<Buffer> {
public:
explicit SnapshotEncoder(Buffer &buffer) : BaseEncoder<Buffer>(buffer) {}
void WriteSnapshotVertex(const VertexAccessor &vertex) {
BaseEncoder<Buffer>::WriteVertex(vertex);
// write in edges without properties
this->WriteUInt(vertex.in_degree());
auto edges_in = vertex.in();
for (const auto &edge : edges_in) {
this->WriteSnapshotEdge(edge, true);
}
// write out edges without properties
this->WriteUInt(vertex.out_degree());
auto edges_out = vertex.out();
for (const auto &edge : edges_out) {
this->WriteSnapshotEdge(edge, false);
}
}
private:
void WriteUInt(const uint64_t &value) {
this->WriteInt(*reinterpret_cast<const int64_t *>(&value));
}
// Writes edge without properties
void WriteSnapshotEdge(const EdgeAccessor &edge, bool write_from) {
WriteUInt(edge.GlobalAddress().raw());
if (write_from)
WriteUInt(edge.from().GlobalAddress().raw());
else
WriteUInt(edge.to().GlobalAddress().raw());
// write type
this->WriteString(edge.db_accessor().EdgeTypeName(edge.EdgeType()));
}
};
} // namespace durability

View File

@ -35,6 +35,11 @@ class Address {
static constexpr uint64_t kRemote{1};
public:
Address() {}
// Constructor for raw address value
Address(Storage storage) : storage_(storage) {}
// Constructor for local Address.
Address(TLocalObj *ptr) {
uintptr_t ptr_no_type = reinterpret_cast<uintptr_t>(ptr);
@ -63,7 +68,7 @@ class Address {
return reinterpret_cast<TLocalObj *>(storage_);
}
gid::Gid global_id() const {
gid::Gid gid() const {
DCHECK(is_remote()) << "Attempting to get global ID from local address";
return storage_ >> (kTypeMaskSize + kWorkerIdSize);
}
@ -74,6 +79,9 @@ class Address {
return (storage_ >> 1) & ((1ULL << kWorkerIdSize) - 1);
}
/// Returns raw address value
Storage raw() const { return storage_; }
bool operator==(const Address<TLocalObj> &other) const {
return storage_ == other.storage_;
}

View File

@ -94,9 +94,10 @@ class Edges {
* present in this iterator. */
void update_position() {
if (vertex_.local()) {
position_ = std::find_if(
position_, end_,
[v = this->vertex_](const Element &e) { return e.vertex == v; });
position_ = std::find_if(position_,
end_, [v = this->vertex_](const Element &e) {
return e.vertex == v;
});
}
if (edge_types_) {
position_ = std::find_if(position_, end_, [this](const Element &e) {

View File

@ -102,7 +102,7 @@ database::GraphDbAccessor &RecordAccessor<TRecord>::db_accessor() const {
template <typename TRecord>
gid::Gid RecordAccessor<TRecord>::gid() const {
return is_local() ? address_.local()->gid_ : address_.global_id();
return is_local() ? address_.local()->gid_ : address_.gid();
}
template <typename TRecord>
@ -111,6 +111,14 @@ storage::Address<mvcc::VersionList<TRecord>> RecordAccessor<TRecord>::address()
return address_;
}
template <typename TRecord>
storage::Address<mvcc::VersionList<TRecord>>
RecordAccessor<TRecord>::GlobalAddress() const {
return is_local() ? storage::Address<mvcc::VersionList<TRecord>>(
gid(), db_accessor_->db_.WorkerId())
: address_;
}
template <typename TRecord>
RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchNew() {
if (is_local()) {
@ -148,8 +156,8 @@ bool RecordAccessor<TRecord>::Reconstruct() const {
// we need to invalidate the RemoteCache and really get the latest stuff.
// But only do that after the command has been advanced.
db_accessor().template remote_elements<TRecord>().FindSetOldNew(
db_accessor().transaction().id_, address_.worker_id(),
address_.global_id(), old_, new_);
db_accessor().transaction().id_, address_.worker_id(), address_.gid(),
old_, new_);
}
current_ = old_ ? old_ : new_;
return old_ != nullptr || new_ != nullptr;
@ -210,7 +218,7 @@ RecordAccessor<Vertex>::AddressT RecordAccessor<Vertex>::NormalizedAddress(
AddressT address) const {
if (address.is_local()) return address;
if (address.worker_id() == db_accessor().db_.WorkerId()) {
return AddressT(db_accessor().LocalVertexAddress(address.global_id()));
return AddressT(db_accessor().LocalVertexAddress(address.gid()));
}
return address;
@ -221,7 +229,7 @@ RecordAccessor<Edge>::AddressT RecordAccessor<Edge>::NormalizedAddress(
AddressT address) const {
if (address.is_local()) return address;
if (address.worker_id() == db_accessor().db_.WorkerId()) {
return AddressT(db_accessor().LocalEdgeAddress(address.global_id()));
return AddressT(db_accessor().LocalEdgeAddress(address.gid()));
}
return address;

View File

@ -96,6 +96,9 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
AddressT address() const;
// Returns an address which is global - composed of gid and worker_id
AddressT GlobalAddress() const;
/*
* Switches this record accessor to use the latest version visible to the
* current transaction+command. Possibly the one that was created by this

View File

@ -273,6 +273,7 @@ nlohmann::json GetWithDefault(const nlohmann::json &object,
}
int main(int argc, char **argv) {
LOG(FATAL) << "Doesn't work with the newest format - waiting for refactor";
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);

View File

@ -9,7 +9,6 @@
#include "glog/logging.h"
#include "gtest/gtest.h"
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
@ -17,6 +16,7 @@
#include "durability/paths.hpp"
#include "durability/recovery.hpp"
#include "durability/snapshooter.hpp"
#include "durability/snapshot_decoder.hpp"
#include "durability/version.hpp"
#include "utils/string.hpp"
@ -72,6 +72,14 @@ class DbGenerator {
return edge;
}
EdgeAccessor InsertCycleEdge() {
auto vertex = RandomVertex();
auto edge =
dba_.InsertEdge(vertex, vertex, EdgeType(RandomInt(kEdgeTypeCount)));
edge_ids_.emplace_back(edge.gid());
return edge;
}
void RemoveEdge() {
auto edge = RandomEdge(true);
dba_.RemoveEdge(edge);
@ -240,6 +248,7 @@ void MakeDb(database::GraphDbAccessor &dba, int scale,
DbGenerator generator{dba};
for (int i = 0; i < scale; i++) generator.InsertVertex();
for (int i = 0; i < scale * 2; i++) generator.InsertEdge();
for (int i = 0; i < scale / 2; i++) generator.InsertCycleEdge();
// Give the WAL some time to flush, we're pumping ops fast here.
std::this_thread::sleep_for(std::chrono::milliseconds(30));
for (int i = 0; i < scale * 3; i++) {
@ -405,7 +414,7 @@ TEST_F(Durability, SnapshotEncoding) {
auto snapshot = GetLastFile(snapshot_dir_);
HashedFileReader buffer;
communication::bolt::Decoder<HashedFileReader> decoder(buffer);
durability::SnapshotDecoder<HashedFileReader> decoder(buffer);
int64_t vertex_count, edge_count;
uint64_t hash;
@ -443,14 +452,13 @@ TEST_F(Durability, SnapshotEncoding) {
EXPECT_EQ(dv.ValueList()[0].ValueString(), "l1");
EXPECT_EQ(dv.ValueList()[1].ValueString(), "p1");
std::map<gid::Gid, communication::bolt::DecodedVertex> decoded_vertices;
std::map<gid::Gid, durability::DecodedSnapshotVertex> decoded_vertices;
// Decode vertices.
for (int i = 0; i < vertex_count; ++i) {
decoder.ReadValue(&dv);
ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Vertex);
auto &vertex = dv.ValueVertex();
decoded_vertices.emplace(vertex.id, vertex);
auto vertex = decoder.ReadSnapshotVertex();
ASSERT_NE(vertex, std::experimental::nullopt);
decoded_vertices.emplace(vertex->gid, *vertex);
}
ASSERT_EQ(decoded_vertices.size(), 3);
ASSERT_EQ(decoded_vertices[gid0].labels.size(), 1);

View File

@ -26,11 +26,11 @@ TEST(Address, Global) {
int worker_id{17};
uint64_t local_id{31};
gid::Generator generator(13);
auto global_id = generator.Next(local_id);
Address<int> address{global_id, worker_id};
auto gid = generator.Next(local_id);
Address<int> address{gid, worker_id};
EXPECT_TRUE(address.is_remote());
EXPECT_FALSE(address.is_local());
EXPECT_EQ(address.global_id(), global_id);
EXPECT_EQ(address.gid(), gid);
EXPECT_EQ(address.worker_id(), worker_id);
}

View File

@ -1,3 +1,4 @@
#include <algorithm>
#include <cstdio>
#include <experimental/filesystem>
#include <experimental/optional>
@ -8,11 +9,12 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "config.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/paths.hpp"
#include "durability/snapshooter.hpp"
#include "durability/snapshot_decoded_value.hpp"
#include "durability/snapshot_encoder.hpp"
#include "durability/version.hpp"
#include "utils/string.hpp"
#include "utils/timer.hpp"
@ -172,11 +174,12 @@ std::vector<Field> ReadHeader(std::istream &stream) {
return fields;
}
query::TypedValue StringToTypedValue(const std::string &str,
const std::string &type) {
communication::bolt::DecodedValue StringToDecodedValue(
const std::string &str, const std::string &type) {
// Empty string signifies Null.
if (str.empty()) return query::TypedValue::Null;
auto convert = [](const auto &str, const auto &type) -> query::TypedValue {
if (str.empty()) return communication::bolt::DecodedValue();
auto convert = [](const auto &str,
const auto &type) -> communication::bolt::DecodedValue {
if (type == "int" || type == "long" || type == "byte" || type == "short") {
std::istringstream ss(str);
int64_t val;
@ -190,14 +193,14 @@ query::TypedValue StringToTypedValue(const std::string &str,
return str;
}
LOG(FATAL) << "Unexpected type: " << type;
return query::TypedValue::Null;
return communication::bolt::DecodedValue();
};
// Type *not* ending with '[]', signifies regular value.
if (!utils::EndsWith(type, "[]")) return convert(str, type);
// Otherwise, we have an array type.
auto elem_type = type.substr(0, type.size() - 2);
auto elems = utils::Split(str, FLAGS_array_delimiter);
std::vector<query::TypedValue> array;
std::vector<communication::bolt::DecodedValue> array;
array.reserve(elems.size());
for (const auto &elem : elems) {
array.push_back(convert(utils::Trim(elem), elem_type));
@ -211,13 +214,14 @@ std::string GetIdSpace(const std::string &type) {
return type.substr(start + 1, type.size() - 1);
}
void WriteNodeRow(const std::vector<Field> &fields,
const std::vector<std::string> &row,
MemgraphNodeIdMap &node_id_map,
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
void WriteNodeRow(
std::unordered_map<gid::Gid, durability::DecodedSnapshotVertex>
&partial_vertices,
const std::vector<Field> &fields, const std::vector<std::string> &row,
MemgraphNodeIdMap &node_id_map) {
std::experimental::optional<gid::Gid> id;
std::vector<query::TypedValue> labels;
std::map<std::string, query::TypedValue> properties;
std::vector<std::string> labels;
std::map<std::string, communication::bolt::DecodedValue> properties;
for (int i = 0; i < row.size(); ++i) {
const auto &field = fields[i];
auto value = utils::Trim(row[i]);
@ -241,21 +245,16 @@ void WriteNodeRow(const std::vector<Field> &fields,
labels.emplace_back(utils::Trim(label));
}
} else if (field.type != "ignore") {
properties[field.name] = StringToTypedValue(value, field.type);
properties[field.name] = StringToDecodedValue(value, field.type);
}
}
CHECK(id) << "Node ID must be specified";
// write node
encoder.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
3);
encoder.WriteRAW(underlying_cast(communication::bolt::Signature::Node));
encoder.WriteInt(*id);
encoder.WriteList(labels);
encoder.WriteMap(properties);
partial_vertices[*id] = {*id, labels, properties, {}};
}
auto ConvertNodes(const std::string &nodes_path, MemgraphNodeIdMap &node_id_map,
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
auto PassNodes(std::unordered_map<gid::Gid, durability::DecodedSnapshotVertex>
&partial_vertices,
const std::string &nodes_path, MemgraphNodeIdMap &node_id_map) {
int64_t node_count = 0;
std::ifstream nodes_file(nodes_path);
CHECK(nodes_file) << fmt::format("Unable to open '{}'", nodes_path);
@ -264,7 +263,7 @@ auto ConvertNodes(const std::string &nodes_path, MemgraphNodeIdMap &node_id_map,
while (!row.empty()) {
CHECK_EQ(row.size(), fields.size())
<< "Expected as many values as there are header fields";
WriteNodeRow(fields, row, node_id_map, encoder);
WriteNodeRow(partial_vertices, fields, row, node_id_map);
// Increase count and move to next row.
node_count += 1;
row = ReadRow(nodes_file);
@ -273,13 +272,13 @@ auto ConvertNodes(const std::string &nodes_path, MemgraphNodeIdMap &node_id_map,
}
void WriteRelationshipsRow(
std::unordered_map<gid::Gid, communication::bolt::DecodedEdge> &edges,
const std::vector<Field> &fields, const std::vector<std::string> &row,
const MemgraphNodeIdMap &node_id_map, gid::Gid relationship_id,
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
const MemgraphNodeIdMap &node_id_map, gid::Gid relationship_id) {
std::experimental::optional<int64_t> start_id;
std::experimental::optional<int64_t> end_id;
std::experimental::optional<std::string> relationship_type;
std::map<std::string, query::TypedValue> properties;
std::map<std::string, communication::bolt::DecodedValue> properties;
for (int i = 0; i < row.size(); ++i) {
const auto &field = fields[i];
auto value = utils::Trim(row[i]);
@ -300,27 +299,20 @@ void WriteRelationshipsRow(
<< "Only one relationship TYPE must be specified";
relationship_type = value;
} else if (field.type != "ignore") {
properties[field.name] = StringToTypedValue(value, field.type);
properties[field.name] = StringToDecodedValue(value, field.type);
}
}
CHECK(start_id) << "START_ID must be set";
CHECK(end_id) << "END_ID must be set";
CHECK(relationship_type) << "Relationship TYPE must be set";
// write relationship
encoder.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
5);
encoder.WriteRAW(
underlying_cast(communication::bolt::Signature::Relationship));
encoder.WriteInt(relationship_id);
encoder.WriteInt(*start_id);
encoder.WriteInt(*end_id);
encoder.WriteString(*relationship_type);
encoder.WriteMap(properties);
edges[relationship_id] = {(int64_t)relationship_id, *start_id, *end_id,
*relationship_type, properties};
}
int ConvertRelationships(
int PassRelationships(
std::unordered_map<gid::Gid, communication::bolt::DecodedEdge> &edges,
const std::string &relationships_path, const MemgraphNodeIdMap &node_id_map,
communication::bolt::BaseEncoder<HashedFileWriter> &encoder,
gid::Generator &relationship_id_generator) {
std::ifstream relationships_file(relationships_path);
CHECK(relationships_file)
@ -331,8 +323,8 @@ int ConvertRelationships(
while (!row.empty()) {
CHECK_EQ(row.size(), fields.size())
<< "Expected as many values as there are header fields";
WriteRelationshipsRow(fields, row, node_id_map,
relationship_id_generator.Next(), encoder);
WriteRelationshipsRow(edges, fields, row, node_id_map,
relationship_id_generator.Next());
++relationships;
row = ReadRow(relationships_file);
}
@ -344,7 +336,7 @@ void Convert(const std::vector<std::string> &nodes,
const std::string &output_path) {
try {
HashedFileWriter buffer(output_path);
communication::bolt::BaseEncoder<HashedFileWriter> encoder(buffer);
durability::SnapshotEncoder<HashedFileWriter> encoder(buffer);
int64_t node_count = 0;
int64_t edge_count = 0;
gid::Generator relationship_id_generator(0);
@ -375,13 +367,68 @@ void Convert(const std::vector<std::string> &nodes,
encoder.WriteInt(0); // Id of transaction that is snapshooting.
encoder.WriteList({}); // Transactional snapshot.
encoder.WriteList({}); // Label + property indexes.
std::unordered_map<gid::Gid, durability::DecodedSnapshotVertex> vertices;
std::unordered_map<gid::Gid, communication::bolt::DecodedEdge> edges;
for (const auto &nodes_file : nodes) {
node_count += ConvertNodes(nodes_file, node_id_map, encoder);
node_count += PassNodes(vertices, nodes_file, node_id_map);
}
for (const auto &relationships_file : relationships) {
edge_count += ConvertRelationships(relationships_file, node_id_map,
encoder, relationship_id_generator);
edge_count += PassRelationships(edges, relationships_file, node_id_map,
relationship_id_generator);
}
for (auto edge : edges) {
auto encoded = edge.second;
vertices[encoded.from].out.push_back({Edges::EdgeAddress(encoded.id, 0),
Edges::VertexAddress(encoded.to, 0),
encoded.type});
vertices[encoded.to].in.push_back({Edges::EdgeAddress(encoded.id, 0),
Edges::VertexAddress(encoded.from, 0),
encoded.type});
}
for (auto vertex_pair : vertices) {
auto &vertex = vertex_pair.second;
// write node
encoder.WriteRAW(
underlying_cast(communication::bolt::Marker::TinyStruct) + 3);
encoder.WriteRAW(underlying_cast(communication::bolt::Signature::Node));
encoder.WriteInt(vertex.gid);
auto &labels = vertex.labels;
std::vector<query::TypedValue> transformed;
std::transform(
labels.begin(), labels.end(), std::back_inserter(transformed),
[](const std::string &str) -> query::TypedValue { return str; });
encoder.WriteList(transformed);
encoder.WriteMap(vertex.properties);
encoder.WriteInt(vertex.in.size());
for (auto edge : vertex.in) {
encoder.WriteInt(edge.address.raw());
encoder.WriteInt(edge.vertex.raw());
encoder.WriteString(edge.type);
}
encoder.WriteInt(vertex.out.size());
for (auto edge : vertex.out) {
encoder.WriteInt(edge.address.raw());
encoder.WriteInt(edge.vertex.raw());
encoder.WriteString(edge.type);
}
}
for (auto edge_pair : edges) {
auto &edge = edge_pair.second;
// write relationship
encoder.WriteRAW(
underlying_cast(communication::bolt::Marker::TinyStruct) + 5);
encoder.WriteRAW(
underlying_cast(communication::bolt::Signature::Relationship));
encoder.WriteInt(edge.id);
encoder.WriteInt(edge.from);
encoder.WriteInt(edge.to);
encoder.WriteString(edge.type);
encoder.WriteMap(edge.properties);
}
buffer.WriteValue(node_count);
buffer.WriteValue(edge_count);
buffer.WriteValue(buffer.hash());