diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cef7d41b2..5bd805a86 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -162,6 +162,7 @@ set(mg_distributed_sources storage/common/types/property_value_store.cpp storage/common/types/slk.cpp storage/distributed/edge_accessor.cpp + storage/distributed/edges_iterator.cpp storage/distributed/record_accessor.cpp storage/distributed/rpc/serialization.cpp storage/distributed/vertex_accessor.cpp diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 311a3e88a..d1ecc11b7 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -13,6 +13,9 @@ #include "glog/logging.h" +#include +#include + #include "auth/auth.hpp" #include "communication/result_stream_faker.hpp" #include "database/graph_db_accessor.hpp" diff --git a/src/storage/distributed/edges.hpp b/src/storage/distributed/edges.hpp index 891e0a85a..ff67e07ca 100644 --- a/src/storage/distributed/edges.hpp +++ b/src/storage/distributed/edges.hpp @@ -18,7 +18,7 @@ * vertex (and consequently that edge Addresses are unique in it). */ class Edges { - private: + public: struct Element { storage::VertexAddress vertex; storage::EdgeAddress edge; @@ -105,7 +105,6 @@ class Edges { } }; - public: /** * Adds an edge to this structure. * diff --git a/src/storage/distributed/edges_iterator.cpp b/src/storage/distributed/edges_iterator.cpp new file mode 100644 index 000000000..f7c2b19c7 --- /dev/null +++ b/src/storage/distributed/edges_iterator.cpp @@ -0,0 +1,55 @@ +#include "storage/distributed/edges_iterator.hpp" + +#include "storage/distributed/vertex_accessor.hpp" + +void EdgeAccessorIterator::CreateOut(const Edges::Element &e) { + edge_accessor_.emplace(e.edge, va_->db_accessor(), va_->address(), e.vertex, + e.edge_type); +} + +void EdgeAccessorIterator::CreateIn(const Edges::Element &e) { + edge_accessor_.emplace(e.edge, va_->db_accessor(), e.vertex, va_->address(), + e.edge_type); +} + +EdgesIterable::EdgesIterable( + const VertexAccessor &va, bool from, const VertexAccessor &dest, + const std::vector *edge_types) { + auto sptr = std::make_shared(va); + begin_.emplace(GetBegin(sptr, from, dest.address(), edge_types)); + end_.emplace(GetEnd(sptr, from)); +} + +EdgesIterable::EdgesIterable( + const VertexAccessor &va, bool from, + const std::vector *edge_types) { + auto sptr = std::make_shared(va); + begin_.emplace(GetBegin(sptr, from, std::experimental::nullopt, edge_types)); + end_.emplace(GetEnd(sptr, from)); +} + +EdgeAccessorIterator EdgesIterable::GetBegin( + std::shared_ptr va, bool from, + std::experimental::optional dest, + const std::vector *edge_types) { + const Edges *edges; + + if (from) { + edges = &va->current().out_; + } else { + edges = &va->current().in_; + } + + return EdgeAccessorIterator(edges->begin(dest, edge_types), va, from); +}; + +EdgeAccessorIterator EdgesIterable::GetEnd(std::shared_ptr va, + bool from) { + if (from) { + auto iter = va->current().out_.end(); + return EdgeAccessorIterator(iter, va, from); + } else { + auto iter = va->current().in_.end(); + return EdgeAccessorIterator(iter, va, from); + } +}; diff --git a/src/storage/distributed/edges_iterator.hpp b/src/storage/distributed/edges_iterator.hpp new file mode 100644 index 000000000..7e7f09855 --- /dev/null +++ b/src/storage/distributed/edges_iterator.hpp @@ -0,0 +1,123 @@ +/// @file + +#pragma once + +#include + +#include "storage/distributed/edge_accessor.hpp" +#include "storage/distributed/edges.hpp" +#include "storage/distributed/vertex.hpp" + +class VertexAccessor; +/// EdgeAccessorIterator is a forward iterator that returns EdgeAccessor for +/// every edge in a Vertex. It also makes sure to keep the data it iterates over +/// alive, this is important because of the LRU cache that evicts data when its +/// full. +class EdgeAccessorIterator { + public: + EdgeAccessorIterator(Edges::Iterator iter, + const std::shared_ptr &va, bool from) + : va_(va), iter_(iter), from_(from) {} + + EdgeAccessorIterator(const EdgeAccessorIterator &other) + : va_(other.va_), iter_(other.iter_), from_(other.from_) {} + + EdgeAccessorIterator &operator=(const EdgeAccessorIterator &other) { + va_ = other.va_; + iter_ = other.iter_; + from_ = other.from_; + return *this; + } + + EdgeAccessorIterator &operator++() { + ++iter_; + ResetAccessor(); + return *this; + } + + EdgeAccessorIterator operator++(int) { + auto old = *this; + ++iter_; + ResetAccessor(); + return old; + } + + EdgeAccessor &operator*() { + UpdateAccessor(); + return *edge_accessor_; + } + + EdgeAccessor *operator->() { + UpdateAccessor(); + return &edge_accessor_.value(); + } + + bool operator==(const EdgeAccessorIterator &other) const { + return iter_ == other.iter_; + } + + bool operator!=(const EdgeAccessorIterator &other) const { + return !(*this == other); + } + + private: + void UpdateAccessor() { + if (edge_accessor_) return; + + if (from_) { + CreateOut(*iter_); + } else { + CreateIn(*iter_); + } + } + + void ResetAccessor() { edge_accessor_ = std::experimental::nullopt; } + + void CreateOut(const Edges::Element &e); + + void CreateIn(const Edges::Element &e); + + std::shared_ptr va_; + std::experimental::optional edge_accessor_; + Edges::Iterator iter_; + bool from_; +}; + +/// EdgesIterable constains begin and end iterator to edges stored in vertex. +/// It has constructors that can be used to create iterators that skip over +/// some elements. +class EdgesIterable { + public: + /// Creates new iterable that will skip edges whose destination vertex is not + /// equal to dest and whose type is not in edge_types. + /// + /// @param dest - the destination vertex address, if empty the edges will not + /// be filtered on destination + /// @param edge_types - the edge types at least one of which must be matched, + /// if nullptr edges are not filtered on type + EdgesIterable(const VertexAccessor &va, bool from, + const VertexAccessor &dest, + const std::vector *edge_types = nullptr); + + /// Creates new iterable that will skip edges whose type is not in edge_types. + /// + /// @param edge_types - the edge types at least one of which must be matched, + /// if nullptr edges are not filtered on type + EdgesIterable(const VertexAccessor &va, bool from, + const std::vector *edge_types = nullptr); + + EdgeAccessorIterator begin() { return *begin_; }; + + EdgeAccessorIterator end() { return *end_; }; + + private: + EdgeAccessorIterator GetBegin( + std::shared_ptr va, bool from, + std::experimental::optional dest, + const std::vector *edge_types = nullptr); + + EdgeAccessorIterator GetEnd(std::shared_ptr va, bool from); + + std::experimental::optional begin_; + std::experimental::optional end_; +}; diff --git a/src/storage/distributed/record_accessor.hpp b/src/storage/distributed/record_accessor.hpp index e4aaedc8d..1bf62dba7 100644 --- a/src/storage/distributed/record_accessor.hpp +++ b/src/storage/distributed/record_accessor.hpp @@ -171,6 +171,10 @@ class RecordAccessor { */ void ReleaseCachedData() const; + /** Returns the current version (either new_ or old_) set on this + * RecordAccessor. */ + const TRecord ¤t() const; + protected: /** * The database::GraphDbAccessor is friend to this accessor so it can @@ -199,10 +203,6 @@ class RecordAccessor { // TODO (vkasljevic) remove this mutable TRecord *current_{nullptr}; - /** Returns the current version (either new_ or old_) set on this - * RecordAccessor. */ - const TRecord ¤t() const; - private: // The database accessor for which this record accessor is created // Provides means of getting to the transaction and database functions. diff --git a/src/storage/distributed/vertex_accessor.hpp b/src/storage/distributed/vertex_accessor.hpp index 2ec032ef2..0a13aa19f 100644 --- a/src/storage/distributed/vertex_accessor.hpp +++ b/src/storage/distributed/vertex_accessor.hpp @@ -5,14 +5,11 @@ #include #include -#include -#include - #include "storage/distributed/cached_data_lock.hpp" #include "storage/distributed/edge_accessor.hpp" +#include "storage/distributed/edges_iterator.hpp" #include "storage/distributed/record_accessor.hpp" #include "storage/distributed/vertex.hpp" -#include "utils/algorithm.hpp" /** * Provides ways for the client programmer (i.e. code generated @@ -23,32 +20,6 @@ */ class VertexAccessor final : public RecordAccessor { using VertexAddress = storage::Address>; - // Helper function for creating an iterator over edges. - // @param begin - begin iterator - // @param end - end iterator - // @param from - if true specifies that the vertex represents `from` part of - // the edge, otherwise it specifies `to` part of the edge - // @param vertex - one endpoint of every edge - // @param db_accessor - database accessor - // @return - Iterator over EdgeAccessors - template - static inline auto MakeAccessorIterator( - TIterator &&begin, TIterator &&end, bool from, VertexAddress vertex, - database::GraphDbAccessor &db_accessor) { - return iter::imap( - [from, vertex, &db_accessor](auto &edges_element) { - if (from) { - return EdgeAccessor(edges_element.edge, db_accessor, vertex, - edges_element.vertex, edges_element.edge_type); - } else { - return EdgeAccessor(edges_element.edge, db_accessor, - edges_element.vertex, vertex, - edges_element.edge_type); - } - }, - utils::Iterable(std::forward(begin), - std::forward(end))); - } public: VertexAccessor(VertexAddress address, database::GraphDbAccessor &db_accessor); @@ -74,10 +45,9 @@ class VertexAccessor final : public RecordAccessor { std::vector labels() const; /** Returns EdgeAccessors for all incoming edges. */ - auto in() const { + EdgesIterable in() const { auto guard = storage::GetDataLock(*this); - return MakeAccessorIterator(current().in_.begin(), current().in_.end(), - false, address(), db_accessor()); + return EdgesIterable(*this, false); } /** @@ -87,14 +57,10 @@ class VertexAccessor final : public RecordAccessor { * @param edge_types - Edge types filter. At least one be matched. If nullptr * or empty, the parameter is ignored. */ - auto in(const VertexAccessor &dest, + EdgesIterable in(const VertexAccessor &dest, const std::vector *edge_types = nullptr) const { - // This is temporary - // TODO (vkasljevic) prepare iterators for lru cache auto guard = storage::GetDataLock(*this); - return MakeAccessorIterator(current().in_.begin(dest.address(), edge_types), - current().in_.end(), false, address(), - db_accessor()); + return EdgesIterable(*this, false, dest, edge_types); } /** @@ -103,22 +69,15 @@ class VertexAccessor final : public RecordAccessor { * @param edge_types - Edge types filter. At least one be matched. If nullptr * or empty, the parameter is ignored. */ - auto in(const std::vector *edge_types) const { - // This is temporary - // TODO (vkasljevic) prepare iterators for lru cache + EdgesIterable in(const std::vector *edge_types) const { auto guard = storage::GetDataLock(*this); - return MakeAccessorIterator( - current().in_.begin(std::experimental::nullopt, edge_types), - current().in_.end(), false, address(), db_accessor()); + return EdgesIterable(*this, false, edge_types); } /** Returns EdgeAccessors for all outgoing edges. */ - auto out() const { - // This is temporary - // TODO (vkasljevic) prepare iterators for lru cache + EdgesIterable out() const { auto guard = storage::GetDataLock(*this); - return MakeAccessorIterator(current().out_.begin(), current().out_.end(), - true, address(), db_accessor()); + return EdgesIterable(*this, true); } /** @@ -129,14 +88,10 @@ class VertexAccessor final : public RecordAccessor { * @param edge_types - Edge types filter. At least one be matched. If nullptr * or empty, the parameter is ignored. */ - auto out(const VertexAccessor &dest, + EdgesIterable out(const VertexAccessor &dest, const std::vector *edge_types = nullptr) const { - // This is temporary - // TODO (vkasljevic) prepare iterators for lru cache auto guard = storage::GetDataLock(*this); - return MakeAccessorIterator( - current().out_.begin(dest.address(), edge_types), current().out_.end(), - true, address(), db_accessor()); + return EdgesIterable(*this, true, dest, edge_types); } /** @@ -145,13 +100,9 @@ class VertexAccessor final : public RecordAccessor { * @param edge_types - Edge types filter. At least one be matched. If nullptr * or empty, the parameter is ignored. */ - auto out(const std::vector *edge_types) const { - // This is temporary - // TODO (vkasljevic) prepare iterators for lru cache + EdgesIterable out(const std::vector *edge_types) const { auto guard = storage::GetDataLock(*this); - return MakeAccessorIterator( - current().out_.begin(std::experimental::nullopt, edge_types), - current().out_.end(), true, address(), db_accessor()); + return EdgesIterable(*this, true, edge_types); } /** Removes the given edge from the outgoing edges of this vertex. Note that diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 722b3b079..e29c715d6 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -79,6 +79,9 @@ target_link_libraries(${test_prefix}distributed_durability mg-distributed kvstor add_unit_test(distributed_dynamic_worker.cpp) target_link_libraries(${test_prefix}distributed_dynamic_worker mg-distributed kvstore_dummy_lib) +add_unit_test(distributed_edges_iterator.cpp) +target_link_libraries(${test_prefix}distributed_edges_iterator mg-distributed kvstore_dummy_lib) + add_unit_test(distributed_gc.cpp) target_link_libraries(${test_prefix}distributed_gc mg-distributed kvstore_dummy_lib) diff --git a/tests/unit/distributed_edges_iterator.cpp b/tests/unit/distributed_edges_iterator.cpp new file mode 100644 index 000000000..09c8d89e5 --- /dev/null +++ b/tests/unit/distributed_edges_iterator.cpp @@ -0,0 +1,217 @@ +#include +#include +#include + +#include "storage/distributed/edges_iterator.hpp" +#include "distributed_common.hpp" + +class EdgesIterableTest : public DistributedGraphDbTest { + public: + EdgesIterableTest() : DistributedGraphDbTest("edges") {} + + void SetUp() override { + DistributedGraphDbTest::SetUp(); + v.emplace(InsertVertex(master())); + w1_v1_out.emplace(InsertVertex(worker(1))); + w1_v2_out.emplace(InsertVertex(worker(1))); + w1_v3_out.emplace(InsertVertex(worker(1))); + w1_v1_in.emplace(InsertVertex(worker(1))); + w1_v2_in.emplace(InsertVertex(worker(1))); + w1_v3_in.emplace(InsertVertex(worker(1))); + w1_e1_out.emplace(InsertEdge(*v, *w1_v1_out, type1)); + w1_e1_in.emplace(InsertEdge(*w1_v1_in, *v, type1)); + w1_e2_out.emplace(InsertEdge(*v, *w1_v2_out, type1)); + w1_e2_in.emplace(InsertEdge(*w1_v2_in, *v, type1)); + w1_e3_out.emplace(InsertEdge(*v, *w1_v3_out, type2)); + w1_e3_in.emplace(InsertEdge(*w1_v3_in, *v, type2)); + w2_v1_out.emplace(InsertVertex(worker(2))); + w2_v2_out.emplace(InsertVertex(worker(2))); + w2_v3_out.emplace(InsertVertex(worker(2))); + w2_v1_in.emplace(InsertVertex(worker(2))); + w2_v2_in.emplace(InsertVertex(worker(2))); + w2_v3_in.emplace(InsertVertex(worker(2))); + w2_e1_out.emplace(InsertEdge(*v, *w2_v1_out, type1)); + w2_e1_in.emplace(InsertEdge(*w2_v1_in, *v, type1)); + w2_e2_out.emplace(InsertEdge(*v, *w2_v2_out, type1)); + w2_e2_in.emplace(InsertEdge(*w2_v2_in, *v, type1)); + w2_e3_out.emplace(InsertEdge(*v, *w2_v3_out, type2)); + w2_e3_in.emplace(InsertEdge(*w2_v3_in, *v, type2)); + } + + // master + std::experimental::optional v; + + // worker 1 vertices + std::experimental::optional w1_v1_out; + std::experimental::optional w1_v2_out; + std::experimental::optional w1_v3_out; + std::experimental::optional w1_v1_in; + std::experimental::optional w1_v2_in; + std::experimental::optional w1_v3_in; + + // worker 1 edges + std::experimental::optional w1_e1_out; + std::experimental::optional w1_e2_out; + std::experimental::optional w1_e3_out; + std::experimental::optional w1_e1_in; + std::experimental::optional w1_e2_in; + std::experimental::optional w1_e3_in; + + // worker 2 vertices + std::experimental::optional w2_v1_out; + std::experimental::optional w2_v2_out; + std::experimental::optional w2_v3_out; + std::experimental::optional w2_v1_in; + std::experimental::optional w2_v2_in; + std::experimental::optional w2_v3_in; + + // worker 2 edges + std::experimental::optional w2_e1_out; + std::experimental::optional w2_e2_out; + std::experimental::optional w2_e3_out; + std::experimental::optional w2_e1_in; + std::experimental::optional w2_e2_in; + std::experimental::optional w2_e3_in; + + // types + std::string type1{"type1"}; + std::string type2{"type2"}; +}; + +TEST_F(EdgesIterableTest, OutEdges) { + auto dba = master().Access(); + VertexAccessor va(*v, *dba); + auto iterable = va.out(); + auto i = iterable.begin(); + EXPECT_EQ(i->GlobalAddress(), *w1_e1_out); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w1_e2_out); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w1_e3_out); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w2_e1_out); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w2_e2_out); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w2_e3_out); + ++i; + EXPECT_EQ(iterable.end(), i); +} + +TEST_F(EdgesIterableTest, InEdges) { + auto dba = master().Access(); + VertexAccessor va(*v, *dba); + auto iterable = va.in(); + auto i = iterable.begin(); + EXPECT_EQ(i->GlobalAddress(), *w1_e1_in); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w1_e2_in); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w1_e3_in); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w2_e1_in); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w2_e2_in); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w2_e3_in); + ++i; + EXPECT_EQ(iterable.end(), i); +} + +TEST_F(EdgesIterableTest, InEdgesDestAddrFilter) { + auto dba = master().Access(); + VertexAccessor va(*v, *dba); + { + VertexAccessor va_dest(*w1_v2_in, *dba); + auto iterable = va.in(va_dest); + auto i = iterable.begin(); + EXPECT_EQ(i->GlobalAddress(), *w1_e2_in); + ++i; + EXPECT_EQ(iterable.end(), i); + } + { + VertexAccessor va_dest(*w2_v1_in, *dba); + auto iterable = va.in(va_dest); + auto i = iterable.begin(); + EXPECT_EQ(i->GlobalAddress(), *w2_e1_in); + ++i; + EXPECT_EQ(iterable.end(), i); + } +} + +TEST_F(EdgesIterableTest, OutEdgesDestAddrFilter) { + auto dba = master().Access(); + VertexAccessor va(*v, *dba); + { + VertexAccessor va_dest(*w1_v2_out, *dba); + auto iterable = va.out(va_dest); + auto i = iterable.begin(); + EXPECT_EQ(i->GlobalAddress(), *w1_e2_out); + ++i; + EXPECT_EQ(iterable.end(), i); + } + { + VertexAccessor va_dest(*w2_v1_out, *dba); + auto iterable = va.out(va_dest); + auto i = iterable.begin(); + EXPECT_EQ(i->GlobalAddress(), *w2_e1_out); + ++i; + EXPECT_EQ(iterable.end(), i); + } +} + +TEST_F(EdgesIterableTest, InEdgesEdgeTypeFilter) { + auto dba = master().Access(); + VertexAccessor va(*v, *dba); + std::vector edge_types{dba->EdgeType(type2)}; + auto iterable = va.in(&edge_types); + auto i = iterable.begin(); + EXPECT_EQ(i->GlobalAddress(), *w1_e3_in); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w2_e3_in); + ++i; + EXPECT_EQ(iterable.end(), i); +} + +TEST_F(EdgesIterableTest, OutEdgesEdgeTypeFilter) { + auto dba = master().Access(); + VertexAccessor va(*v, *dba); + std::vector edge_types{dba->EdgeType(type2)}; + auto iterable = va.out(&edge_types); + auto i = iterable.begin(); + EXPECT_EQ(i->GlobalAddress(), *w1_e3_out); + ++i; + EXPECT_EQ(i->GlobalAddress(), *w2_e3_out); + ++i; + EXPECT_EQ(iterable.end(), i); +} + +TEST_F(EdgesIterableTest, InEdgesEdgeTypeAndDestFilter) { + auto dba = master().Access(); + VertexAccessor va(*v, *dba); + VertexAccessor va_dest(*w1_v1_in, *dba); + std::vector edge_types{dba->EdgeType(type1)}; + auto iterable = va.in(va_dest, &edge_types); + auto i = iterable.begin(); + EXPECT_EQ(i->GlobalAddress(), *w1_e1_in); + ++i; + EXPECT_EQ(iterable.end(), i); +} + +TEST_F(EdgesIterableTest, OutEdgesEdgeTypeAndDestFilter) { + auto dba = master().Access(); + VertexAccessor va(*v, *dba); + VertexAccessor va_dest(*w1_v1_out, *dba); + std::vector edge_types{dba->EdgeType(type1)}; + auto iterable = va.out(va_dest, &edge_types); + auto i = iterable.begin(); + EXPECT_EQ(i->GlobalAddress(), *w1_e1_out); + ++i; + EXPECT_EQ(iterable.end(), i); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + google::InitGoogleLogging(argv[0]); + return RUN_ALL_TESTS(); +} diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index 65f3bc197..5fed1f1fc 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -511,7 +511,11 @@ class DistributedEdgeRemoveTest : public DistributedGraphDbTest { template auto Size(TIterable iterable) { - return std::distance(iterable.begin(), iterable.end()); + size_t size = 0; + for (auto i = iterable.begin(); i != iterable.end(); ++i) { + ++size; + } + return size; }; void CheckCreation() {