Edges iterator implementation

Summary:
`EdgesIterable` is used for iterating over Edges in distributed memgraph. Because of lru cache there is a possibility of data getting evicted as someone iterates over it.
To prevent that `EdgesIterable` will lock that data and release it when it's deconstructed.

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1868
This commit is contained in:
Vinko Kasljevic 2019-02-26 08:41:16 +01:00
parent 4d1d9fb15a
commit bbb69a1c00
10 changed files with 425 additions and 69 deletions

View File

@ -162,6 +162,7 @@ set(mg_distributed_sources
storage/common/types/property_value_store.cpp
storage/common/types/slk.cpp
storage/distributed/edge_accessor.cpp
storage/distributed/edges_iterator.cpp
storage/distributed/record_accessor.cpp
storage/distributed/rpc/serialization.cpp
storage/distributed/vertex_accessor.cpp

View File

@ -13,6 +13,9 @@
#include "glog/logging.h"
#include <cppitertools/chain.hpp>
#include <cppitertools/imap.hpp>
#include "auth/auth.hpp"
#include "communication/result_stream_faker.hpp"
#include "database/graph_db_accessor.hpp"

View File

@ -18,7 +18,7 @@
* vertex (and consequently that edge Addresses are unique in it).
*/
class Edges {
private:
public:
struct Element {
storage::VertexAddress vertex;
storage::EdgeAddress edge;
@ -105,7 +105,6 @@ class Edges {
}
};
public:
/**
* Adds an edge to this structure.
*

View File

@ -0,0 +1,55 @@
#include "storage/distributed/edges_iterator.hpp"
#include "storage/distributed/vertex_accessor.hpp"
void EdgeAccessorIterator::CreateOut(const Edges::Element &e) {
edge_accessor_.emplace(e.edge, va_->db_accessor(), va_->address(), e.vertex,
e.edge_type);
}
void EdgeAccessorIterator::CreateIn(const Edges::Element &e) {
edge_accessor_.emplace(e.edge, va_->db_accessor(), e.vertex, va_->address(),
e.edge_type);
}
EdgesIterable::EdgesIterable(
const VertexAccessor &va, bool from, const VertexAccessor &dest,
const std::vector<storage::EdgeType> *edge_types) {
auto sptr = std::make_shared<VertexAccessor>(va);
begin_.emplace(GetBegin(sptr, from, dest.address(), edge_types));
end_.emplace(GetEnd(sptr, from));
}
EdgesIterable::EdgesIterable(
const VertexAccessor &va, bool from,
const std::vector<storage::EdgeType> *edge_types) {
auto sptr = std::make_shared<VertexAccessor>(va);
begin_.emplace(GetBegin(sptr, from, std::experimental::nullopt, edge_types));
end_.emplace(GetEnd(sptr, from));
}
EdgeAccessorIterator EdgesIterable::GetBegin(
std::shared_ptr<VertexAccessor> va, bool from,
std::experimental::optional<storage::VertexAddress> dest,
const std::vector<storage::EdgeType> *edge_types) {
const Edges *edges;
if (from) {
edges = &va->current().out_;
} else {
edges = &va->current().in_;
}
return EdgeAccessorIterator(edges->begin(dest, edge_types), va, from);
};
EdgeAccessorIterator EdgesIterable::GetEnd(std::shared_ptr<VertexAccessor> va,
bool from) {
if (from) {
auto iter = va->current().out_.end();
return EdgeAccessorIterator(iter, va, from);
} else {
auto iter = va->current().in_.end();
return EdgeAccessorIterator(iter, va, from);
}
};

View File

@ -0,0 +1,123 @@
/// @file
#pragma once
#include <experimental/optional>
#include "storage/distributed/edge_accessor.hpp"
#include "storage/distributed/edges.hpp"
#include "storage/distributed/vertex.hpp"
class VertexAccessor;
/// EdgeAccessorIterator is a forward iterator that returns EdgeAccessor for
/// every edge in a Vertex. It also makes sure to keep the data it iterates over
/// alive, this is important because of the LRU cache that evicts data when its
/// full.
class EdgeAccessorIterator {
public:
EdgeAccessorIterator(Edges::Iterator iter,
const std::shared_ptr<VertexAccessor> &va, bool from)
: va_(va), iter_(iter), from_(from) {}
EdgeAccessorIterator(const EdgeAccessorIterator &other)
: va_(other.va_), iter_(other.iter_), from_(other.from_) {}
EdgeAccessorIterator &operator=(const EdgeAccessorIterator &other) {
va_ = other.va_;
iter_ = other.iter_;
from_ = other.from_;
return *this;
}
EdgeAccessorIterator &operator++() {
++iter_;
ResetAccessor();
return *this;
}
EdgeAccessorIterator operator++(int) {
auto old = *this;
++iter_;
ResetAccessor();
return old;
}
EdgeAccessor &operator*() {
UpdateAccessor();
return *edge_accessor_;
}
EdgeAccessor *operator->() {
UpdateAccessor();
return &edge_accessor_.value();
}
bool operator==(const EdgeAccessorIterator &other) const {
return iter_ == other.iter_;
}
bool operator!=(const EdgeAccessorIterator &other) const {
return !(*this == other);
}
private:
void UpdateAccessor() {
if (edge_accessor_) return;
if (from_) {
CreateOut(*iter_);
} else {
CreateIn(*iter_);
}
}
void ResetAccessor() { edge_accessor_ = std::experimental::nullopt; }
void CreateOut(const Edges::Element &e);
void CreateIn(const Edges::Element &e);
std::shared_ptr<VertexAccessor> va_;
std::experimental::optional<EdgeAccessor> edge_accessor_;
Edges::Iterator iter_;
bool from_;
};
/// EdgesIterable constains begin and end iterator to edges stored in vertex.
/// It has constructors that can be used to create iterators that skip over
/// some elements.
class EdgesIterable {
public:
/// Creates new iterable that will skip edges whose destination vertex is not
/// equal to dest and whose type is not in edge_types.
///
/// @param dest - the destination vertex address, if empty the edges will not
/// be filtered on destination
/// @param edge_types - the edge types at least one of which must be matched,
/// if nullptr edges are not filtered on type
EdgesIterable(const VertexAccessor &va, bool from,
const VertexAccessor &dest,
const std::vector<storage::EdgeType> *edge_types = nullptr);
/// Creates new iterable that will skip edges whose type is not in edge_types.
///
/// @param edge_types - the edge types at least one of which must be matched,
/// if nullptr edges are not filtered on type
EdgesIterable(const VertexAccessor &va, bool from,
const std::vector<storage::EdgeType> *edge_types = nullptr);
EdgeAccessorIterator begin() { return *begin_; };
EdgeAccessorIterator end() { return *end_; };
private:
EdgeAccessorIterator GetBegin(
std::shared_ptr<VertexAccessor> va, bool from,
std::experimental::optional<storage::VertexAddress> dest,
const std::vector<storage::EdgeType> *edge_types = nullptr);
EdgeAccessorIterator GetEnd(std::shared_ptr<VertexAccessor> va, bool from);
std::experimental::optional<EdgeAccessorIterator> begin_;
std::experimental::optional<EdgeAccessorIterator> end_;
};

View File

@ -171,6 +171,10 @@ class RecordAccessor {
*/
void ReleaseCachedData() const;
/** Returns the current version (either new_ or old_) set on this
* RecordAccessor. */
const TRecord &current() const;
protected:
/**
* The database::GraphDbAccessor is friend to this accessor so it can
@ -199,10 +203,6 @@ class RecordAccessor {
// TODO (vkasljevic) remove this
mutable TRecord *current_{nullptr};
/** Returns the current version (either new_ or old_) set on this
* RecordAccessor. */
const TRecord &current() const;
private:
// The database accessor for which this record accessor is created
// Provides means of getting to the transaction and database functions.

View File

@ -5,14 +5,11 @@
#include <set>
#include <vector>
#include <cppitertools/chain.hpp>
#include <cppitertools/imap.hpp>
#include "storage/distributed/cached_data_lock.hpp"
#include "storage/distributed/edge_accessor.hpp"
#include "storage/distributed/edges_iterator.hpp"
#include "storage/distributed/record_accessor.hpp"
#include "storage/distributed/vertex.hpp"
#include "utils/algorithm.hpp"
/**
* Provides ways for the client programmer (i.e. code generated
@ -23,32 +20,6 @@
*/
class VertexAccessor final : public RecordAccessor<Vertex> {
using VertexAddress = storage::Address<mvcc::VersionList<Vertex>>;
// Helper function for creating an iterator over edges.
// @param begin - begin iterator
// @param end - end iterator
// @param from - if true specifies that the vertex represents `from` part of
// the edge, otherwise it specifies `to` part of the edge
// @param vertex - one endpoint of every edge
// @param db_accessor - database accessor
// @return - Iterator over EdgeAccessors
template <typename TIterator>
static inline auto MakeAccessorIterator(
TIterator &&begin, TIterator &&end, bool from, VertexAddress vertex,
database::GraphDbAccessor &db_accessor) {
return iter::imap(
[from, vertex, &db_accessor](auto &edges_element) {
if (from) {
return EdgeAccessor(edges_element.edge, db_accessor, vertex,
edges_element.vertex, edges_element.edge_type);
} else {
return EdgeAccessor(edges_element.edge, db_accessor,
edges_element.vertex, vertex,
edges_element.edge_type);
}
},
utils::Iterable<TIterator>(std::forward<TIterator>(begin),
std::forward<TIterator>(end)));
}
public:
VertexAccessor(VertexAddress address, database::GraphDbAccessor &db_accessor);
@ -74,10 +45,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
std::vector<storage::Label> labels() const;
/** Returns EdgeAccessors for all incoming edges. */
auto in() const {
EdgesIterable in() const {
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(current().in_.begin(), current().in_.end(),
false, address(), db_accessor());
return EdgesIterable(*this, false);
}
/**
@ -87,14 +57,10 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
* @param edge_types - Edge types filter. At least one be matched. If nullptr
* or empty, the parameter is ignored.
*/
auto in(const VertexAccessor &dest,
EdgesIterable in(const VertexAccessor &dest,
const std::vector<storage::EdgeType> *edge_types = nullptr) const {
// This is temporary
// TODO (vkasljevic) prepare iterators for lru cache
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(current().in_.begin(dest.address(), edge_types),
current().in_.end(), false, address(),
db_accessor());
return EdgesIterable(*this, false, dest, edge_types);
}
/**
@ -103,22 +69,15 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
* @param edge_types - Edge types filter. At least one be matched. If nullptr
* or empty, the parameter is ignored.
*/
auto in(const std::vector<storage::EdgeType> *edge_types) const {
// This is temporary
// TODO (vkasljevic) prepare iterators for lru cache
EdgesIterable in(const std::vector<storage::EdgeType> *edge_types) const {
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(
current().in_.begin(std::experimental::nullopt, edge_types),
current().in_.end(), false, address(), db_accessor());
return EdgesIterable(*this, false, edge_types);
}
/** Returns EdgeAccessors for all outgoing edges. */
auto out() const {
// This is temporary
// TODO (vkasljevic) prepare iterators for lru cache
EdgesIterable out() const {
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(current().out_.begin(), current().out_.end(),
true, address(), db_accessor());
return EdgesIterable(*this, true);
}
/**
@ -129,14 +88,10 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
* @param edge_types - Edge types filter. At least one be matched. If nullptr
* or empty, the parameter is ignored.
*/
auto out(const VertexAccessor &dest,
EdgesIterable out(const VertexAccessor &dest,
const std::vector<storage::EdgeType> *edge_types = nullptr) const {
// This is temporary
// TODO (vkasljevic) prepare iterators for lru cache
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(
current().out_.begin(dest.address(), edge_types), current().out_.end(),
true, address(), db_accessor());
return EdgesIterable(*this, true, dest, edge_types);
}
/**
@ -145,13 +100,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
* @param edge_types - Edge types filter. At least one be matched. If nullptr
* or empty, the parameter is ignored.
*/
auto out(const std::vector<storage::EdgeType> *edge_types) const {
// This is temporary
// TODO (vkasljevic) prepare iterators for lru cache
EdgesIterable out(const std::vector<storage::EdgeType> *edge_types) const {
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(
current().out_.begin(std::experimental::nullopt, edge_types),
current().out_.end(), true, address(), db_accessor());
return EdgesIterable(*this, true, edge_types);
}
/** Removes the given edge from the outgoing edges of this vertex. Note that

View File

@ -79,6 +79,9 @@ target_link_libraries(${test_prefix}distributed_durability mg-distributed kvstor
add_unit_test(distributed_dynamic_worker.cpp)
target_link_libraries(${test_prefix}distributed_dynamic_worker mg-distributed kvstore_dummy_lib)
add_unit_test(distributed_edges_iterator.cpp)
target_link_libraries(${test_prefix}distributed_edges_iterator mg-distributed kvstore_dummy_lib)
add_unit_test(distributed_gc.cpp)
target_link_libraries(${test_prefix}distributed_gc mg-distributed kvstore_dummy_lib)

View File

@ -0,0 +1,217 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "storage/distributed/edges_iterator.hpp"
#include "distributed_common.hpp"
class EdgesIterableTest : public DistributedGraphDbTest {
public:
EdgesIterableTest() : DistributedGraphDbTest("edges") {}
void SetUp() override {
DistributedGraphDbTest::SetUp();
v.emplace(InsertVertex(master()));
w1_v1_out.emplace(InsertVertex(worker(1)));
w1_v2_out.emplace(InsertVertex(worker(1)));
w1_v3_out.emplace(InsertVertex(worker(1)));
w1_v1_in.emplace(InsertVertex(worker(1)));
w1_v2_in.emplace(InsertVertex(worker(1)));
w1_v3_in.emplace(InsertVertex(worker(1)));
w1_e1_out.emplace(InsertEdge(*v, *w1_v1_out, type1));
w1_e1_in.emplace(InsertEdge(*w1_v1_in, *v, type1));
w1_e2_out.emplace(InsertEdge(*v, *w1_v2_out, type1));
w1_e2_in.emplace(InsertEdge(*w1_v2_in, *v, type1));
w1_e3_out.emplace(InsertEdge(*v, *w1_v3_out, type2));
w1_e3_in.emplace(InsertEdge(*w1_v3_in, *v, type2));
w2_v1_out.emplace(InsertVertex(worker(2)));
w2_v2_out.emplace(InsertVertex(worker(2)));
w2_v3_out.emplace(InsertVertex(worker(2)));
w2_v1_in.emplace(InsertVertex(worker(2)));
w2_v2_in.emplace(InsertVertex(worker(2)));
w2_v3_in.emplace(InsertVertex(worker(2)));
w2_e1_out.emplace(InsertEdge(*v, *w2_v1_out, type1));
w2_e1_in.emplace(InsertEdge(*w2_v1_in, *v, type1));
w2_e2_out.emplace(InsertEdge(*v, *w2_v2_out, type1));
w2_e2_in.emplace(InsertEdge(*w2_v2_in, *v, type1));
w2_e3_out.emplace(InsertEdge(*v, *w2_v3_out, type2));
w2_e3_in.emplace(InsertEdge(*w2_v3_in, *v, type2));
}
// master
std::experimental::optional<storage::VertexAddress> v;
// worker 1 vertices
std::experimental::optional<storage::VertexAddress> w1_v1_out;
std::experimental::optional<storage::VertexAddress> w1_v2_out;
std::experimental::optional<storage::VertexAddress> w1_v3_out;
std::experimental::optional<storage::VertexAddress> w1_v1_in;
std::experimental::optional<storage::VertexAddress> w1_v2_in;
std::experimental::optional<storage::VertexAddress> w1_v3_in;
// worker 1 edges
std::experimental::optional<storage::EdgeAddress> w1_e1_out;
std::experimental::optional<storage::EdgeAddress> w1_e2_out;
std::experimental::optional<storage::EdgeAddress> w1_e3_out;
std::experimental::optional<storage::EdgeAddress> w1_e1_in;
std::experimental::optional<storage::EdgeAddress> w1_e2_in;
std::experimental::optional<storage::EdgeAddress> w1_e3_in;
// worker 2 vertices
std::experimental::optional<storage::VertexAddress> w2_v1_out;
std::experimental::optional<storage::VertexAddress> w2_v2_out;
std::experimental::optional<storage::VertexAddress> w2_v3_out;
std::experimental::optional<storage::VertexAddress> w2_v1_in;
std::experimental::optional<storage::VertexAddress> w2_v2_in;
std::experimental::optional<storage::VertexAddress> w2_v3_in;
// worker 2 edges
std::experimental::optional<storage::EdgeAddress> w2_e1_out;
std::experimental::optional<storage::EdgeAddress> w2_e2_out;
std::experimental::optional<storage::EdgeAddress> w2_e3_out;
std::experimental::optional<storage::EdgeAddress> w2_e1_in;
std::experimental::optional<storage::EdgeAddress> w2_e2_in;
std::experimental::optional<storage::EdgeAddress> w2_e3_in;
// types
std::string type1{"type1"};
std::string type2{"type2"};
};
TEST_F(EdgesIterableTest, OutEdges) {
auto dba = master().Access();
VertexAccessor va(*v, *dba);
auto iterable = va.out();
auto i = iterable.begin();
EXPECT_EQ(i->GlobalAddress(), *w1_e1_out);
++i;
EXPECT_EQ(i->GlobalAddress(), *w1_e2_out);
++i;
EXPECT_EQ(i->GlobalAddress(), *w1_e3_out);
++i;
EXPECT_EQ(i->GlobalAddress(), *w2_e1_out);
++i;
EXPECT_EQ(i->GlobalAddress(), *w2_e2_out);
++i;
EXPECT_EQ(i->GlobalAddress(), *w2_e3_out);
++i;
EXPECT_EQ(iterable.end(), i);
}
TEST_F(EdgesIterableTest, InEdges) {
auto dba = master().Access();
VertexAccessor va(*v, *dba);
auto iterable = va.in();
auto i = iterable.begin();
EXPECT_EQ(i->GlobalAddress(), *w1_e1_in);
++i;
EXPECT_EQ(i->GlobalAddress(), *w1_e2_in);
++i;
EXPECT_EQ(i->GlobalAddress(), *w1_e3_in);
++i;
EXPECT_EQ(i->GlobalAddress(), *w2_e1_in);
++i;
EXPECT_EQ(i->GlobalAddress(), *w2_e2_in);
++i;
EXPECT_EQ(i->GlobalAddress(), *w2_e3_in);
++i;
EXPECT_EQ(iterable.end(), i);
}
TEST_F(EdgesIterableTest, InEdgesDestAddrFilter) {
auto dba = master().Access();
VertexAccessor va(*v, *dba);
{
VertexAccessor va_dest(*w1_v2_in, *dba);
auto iterable = va.in(va_dest);
auto i = iterable.begin();
EXPECT_EQ(i->GlobalAddress(), *w1_e2_in);
++i;
EXPECT_EQ(iterable.end(), i);
}
{
VertexAccessor va_dest(*w2_v1_in, *dba);
auto iterable = va.in(va_dest);
auto i = iterable.begin();
EXPECT_EQ(i->GlobalAddress(), *w2_e1_in);
++i;
EXPECT_EQ(iterable.end(), i);
}
}
TEST_F(EdgesIterableTest, OutEdgesDestAddrFilter) {
auto dba = master().Access();
VertexAccessor va(*v, *dba);
{
VertexAccessor va_dest(*w1_v2_out, *dba);
auto iterable = va.out(va_dest);
auto i = iterable.begin();
EXPECT_EQ(i->GlobalAddress(), *w1_e2_out);
++i;
EXPECT_EQ(iterable.end(), i);
}
{
VertexAccessor va_dest(*w2_v1_out, *dba);
auto iterable = va.out(va_dest);
auto i = iterable.begin();
EXPECT_EQ(i->GlobalAddress(), *w2_e1_out);
++i;
EXPECT_EQ(iterable.end(), i);
}
}
TEST_F(EdgesIterableTest, InEdgesEdgeTypeFilter) {
auto dba = master().Access();
VertexAccessor va(*v, *dba);
std::vector<storage::EdgeType> edge_types{dba->EdgeType(type2)};
auto iterable = va.in(&edge_types);
auto i = iterable.begin();
EXPECT_EQ(i->GlobalAddress(), *w1_e3_in);
++i;
EXPECT_EQ(i->GlobalAddress(), *w2_e3_in);
++i;
EXPECT_EQ(iterable.end(), i);
}
TEST_F(EdgesIterableTest, OutEdgesEdgeTypeFilter) {
auto dba = master().Access();
VertexAccessor va(*v, *dba);
std::vector<storage::EdgeType> edge_types{dba->EdgeType(type2)};
auto iterable = va.out(&edge_types);
auto i = iterable.begin();
EXPECT_EQ(i->GlobalAddress(), *w1_e3_out);
++i;
EXPECT_EQ(i->GlobalAddress(), *w2_e3_out);
++i;
EXPECT_EQ(iterable.end(), i);
}
TEST_F(EdgesIterableTest, InEdgesEdgeTypeAndDestFilter) {
auto dba = master().Access();
VertexAccessor va(*v, *dba);
VertexAccessor va_dest(*w1_v1_in, *dba);
std::vector<storage::EdgeType> edge_types{dba->EdgeType(type1)};
auto iterable = va.in(va_dest, &edge_types);
auto i = iterable.begin();
EXPECT_EQ(i->GlobalAddress(), *w1_e1_in);
++i;
EXPECT_EQ(iterable.end(), i);
}
TEST_F(EdgesIterableTest, OutEdgesEdgeTypeAndDestFilter) {
auto dba = master().Access();
VertexAccessor va(*v, *dba);
VertexAccessor va_dest(*w1_v1_out, *dba);
std::vector<storage::EdgeType> edge_types{dba->EdgeType(type1)};
auto iterable = va.out(va_dest, &edge_types);
auto i = iterable.begin();
EXPECT_EQ(i->GlobalAddress(), *w1_e1_out);
++i;
EXPECT_EQ(iterable.end(), i);
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
google::InitGoogleLogging(argv[0]);
return RUN_ALL_TESTS();
}

View File

@ -511,7 +511,11 @@ class DistributedEdgeRemoveTest : public DistributedGraphDbTest {
template <typename TIterable>
auto Size(TIterable iterable) {
return std::distance(iterable.begin(), iterable.end());
size_t size = 0;
for (auto i = iterable.begin(); i != iterable.end(); ++i) {
++size;
}
return size;
};
void CheckCreation() {