Separate distributed from single node GraphDb

Summary:
To clean the working directory after this diff you should execute:

```
rm src/database/counters_rpc_messages.capnp
rm src/database/counters_rpc_messages.hpp
rm src/database/serialization.capnp
rm src/database/serialization.hpp
```

Reviewers: teon.banek, msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1636
This commit is contained in:
Matej Ferencevic 2018-10-05 12:37:23 +02:00
parent fdf609b9d8
commit ade2593b51
97 changed files with 2344 additions and 1012 deletions

8
.gitignore vendored
View File

@ -40,10 +40,10 @@ TAGS
# LCP generated C++ & Cap'n Proto files
*.lcp.cpp
src/database/counters_rpc_messages.capnp
src/database/counters_rpc_messages.hpp
src/database/serialization.capnp
src/database/serialization.hpp
src/database/distributed/counters_rpc_messages.capnp
src/database/distributed/counters_rpc_messages.hpp
src/database/distributed/serialization.capnp
src/database/distributed/serialization.hpp
src/distributed/bfs_rpc_messages.capnp
src/distributed/bfs_rpc_messages.hpp
src/distributed/coordination_rpc_messages.capnp

View File

@ -15,9 +15,9 @@ add_subdirectory(auth)
# ----------------------------------------------------------------------------
set(mg_single_node_sources
data_structures/concurrent/skiplist_gc.cpp
database/config.cpp
database/graph_db.cpp
database/graph_db_accessor.cpp
database/single_node/config.cpp
database/single_node/graph_db.cpp
database/single_node/graph_db_accessor.cpp
durability/single_node/state_delta.cpp
durability/paths.cpp
durability/single_node/recovery.cpp
@ -87,8 +87,8 @@ target_compile_definitions(mg-single-node PUBLIC MG_SINGLE_NODE)
# ----------------------------------------------------------------------------
set(mg_distributed_sources
database/distributed_counters.cpp
database/distributed_graph_db.cpp
database/distributed/distributed_counters.cpp
database/distributed/distributed_graph_db.cpp
distributed/bfs_rpc_clients.cpp
distributed/bfs_subcursor.cpp
distributed/cluster_discovery_master.cpp
@ -120,8 +120,8 @@ set(mg_distributed_sources
transactions/distributed/engine_master.cpp
transactions/distributed/engine_worker.cpp
data_structures/concurrent/skiplist_gc.cpp
database/config.cpp
database/graph_db_accessor.cpp
database/distributed/config.cpp
database/distributed/graph_db_accessor.cpp
durability/distributed/state_delta.cpp
durability/paths.cpp
durability/distributed/recovery.cpp
@ -163,11 +163,11 @@ define_add_capnp(mg_distributed_sources generated_capnp_files)
define_add_lcp(add_lcp_distributed mg_distributed_sources generated_lcp_distributed_files)
add_lcp_distributed(durability/distributed/state_delta.lcp)
add_lcp_distributed(database/counters_rpc_messages.lcp CAPNP_SCHEMA @0x95a2c3ea3871e945)
add_capnp(database/counters_rpc_messages.capnp)
add_lcp_distributed(database/serialization.lcp CAPNP_SCHEMA @0xdea01657b3563887
add_lcp_distributed(database/distributed/counters_rpc_messages.lcp CAPNP_SCHEMA @0x95a2c3ea3871e945)
add_capnp(database/distributed/counters_rpc_messages.capnp)
add_lcp_distributed(database/distributed/serialization.lcp CAPNP_SCHEMA @0xdea01657b3563887
DEPENDS durability/distributed/state_delta.lcp)
add_capnp(database/serialization.capnp)
add_capnp(database/distributed/serialization.capnp)
add_lcp_distributed(distributed/bfs_rpc_messages.lcp CAPNP_SCHEMA @0x8e508640b09b6d2a)
add_capnp(distributed/bfs_rpc_messages.capnp)
add_lcp_distributed(distributed/coordination_rpc_messages.lcp CAPNP_SCHEMA @0x93df0c4703cf98fb)

View File

@ -1,13 +1,7 @@
#include <limits>
#include "database/graph_db.hpp"
// TODO: THIS IS A HACK!
#ifdef MG_SINGLE_NODE
#include "storage/single_node/gid.hpp"
#endif
#ifdef MG_DISTRIBUTED
#include "database/distributed/graph_db.hpp"
#include "storage/distributed/gid.hpp"
#endif
#include "utils/flag_validation.hpp"
#include "utils/string.hpp"
@ -43,7 +37,6 @@ DEFINE_bool(synchronous_commit, false,
"Should a transaction end wait for WAL records to be written to "
"disk before the transaction finishes.");
#ifndef MG_COMMUNITY
// Distributed master/worker flags.
DEFINE_VALIDATED_HIDDEN_int32(worker_id, 0,
"ID of a worker in a distributed system. Igored "
@ -83,9 +76,7 @@ DEFINE_VALIDATED_int32(recovering_cluster_size, 0,
// The implementation should be straightforward.
DEFINE_bool(dynamic_graph_partitioner_enabled, false,
"If the dynamic graph partitioner should be enabled.");
#endif
// clang-format off
database::Config::Config()
// Durability flags.
: durability_enabled{FLAGS_durability_enabled},
@ -99,11 +90,10 @@ database::Config::Config()
gc_cycle_sec{FLAGS_gc_cycle_sec},
query_execution_time_sec{FLAGS_query_execution_time_sec},
// Data location.
properties_on_disk(utils::Split(FLAGS_properties_on_disk, ","))
#ifndef MG_COMMUNITY
,
properties_on_disk(utils::Split(FLAGS_properties_on_disk, ",")),
// Distributed flags.
dynamic_graph_partitioner_enabled{FLAGS_dynamic_graph_partitioner_enabled},
dynamic_graph_partitioner_enabled{
FLAGS_dynamic_graph_partitioner_enabled},
rpc_num_client_workers{FLAGS_rpc_num_client_workers},
rpc_num_server_workers{FLAGS_rpc_num_server_workers},
worker_id{FLAGS_worker_id},
@ -111,7 +101,4 @@ database::Config::Config()
static_cast<uint16_t>(FLAGS_master_port)},
worker_endpoint{FLAGS_worker_host,
static_cast<uint16_t>(FLAGS_worker_port)},
recovering_cluster_size{FLAGS_recovering_cluster_size}
#endif
{}
// clang-format on
recovering_cluster_size{FLAGS_recovering_cluster_size} {}

View File

@ -4,7 +4,7 @@
#include <string>
#include "communication/rpc/messages.hpp"
#include "database/counters_rpc_messages.capnp.h"
#include "database/distributed/counters_rpc_messages.capnp.h"
cpp<#
(lcp:namespace database)

View File

@ -1,8 +1,8 @@
#include "database/distributed_counters.hpp"
#include "database/distributed/distributed_counters.hpp"
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "database/counters_rpc_messages.hpp"
#include "database/distributed/counters_rpc_messages.hpp"
namespace database {

View File

@ -1,6 +1,6 @@
#include "database/distributed_graph_db.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed_counters.hpp"
#include "database/distributed/distributed_counters.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "distributed/bfs_rpc_server.hpp"
#include "distributed/bfs_subcursor.hpp"

View File

@ -2,7 +2,7 @@
#pragma once
#include "database/graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "durability/distributed/version.hpp"
namespace distributed {

View File

@ -0,0 +1,164 @@
/// @file
#pragma once
#include <atomic>
#include <memory>
#include <vector>
#include "database/counters.hpp"
#include "durability/distributed/recovery.hpp"
#include "durability/distributed/wal.hpp"
#include "io/network/endpoint.hpp"
#include "storage/common/concurrent_id_mapper.hpp"
#include "storage/common/types.hpp"
#include "storage/distributed/storage.hpp"
#include "storage/distributed/storage_gc.hpp"
#include "storage/distributed/vertex_accessor.hpp"
#include "transactions/engine.hpp"
#include "utils/scheduler.hpp"
namespace database {
/// Database configuration. Initialized from flags, but modifiable.
struct Config {
Config();
// Durability flags.
bool durability_enabled;
std::string durability_directory;
bool db_recover_on_startup;
int snapshot_cycle_sec;
int snapshot_max_retained;
int snapshot_on_exit;
bool synchronous_commit;
// Misc flags.
int gc_cycle_sec;
int query_execution_time_sec;
// set of properties which will be stored on disk
std::vector<std::string> properties_on_disk;
// Distributed master/worker flags.
bool dynamic_graph_partitioner_enabled{false};
int rpc_num_client_workers{0};
int rpc_num_server_workers{0};
int worker_id{0};
io::network::Endpoint master_endpoint{"0.0.0.0", 0};
io::network::Endpoint worker_endpoint{"0.0.0.0", 0};
int recovering_cluster_size{0};
};
class GraphDbAccessor;
/// An abstract base class providing the interface for a graph database.
///
/// Always be sure that GraphDb object is destructed before main exits, i. e.
/// GraphDb object shouldn't be part of global/static variable, except if its
/// destructor is explicitly called before main exits. Consider code:
///
/// GraphDb db; // KeyIndex is created as a part of database::Storage
/// int main() {
/// GraphDbAccessor dba(db);
/// auto v = dba.InsertVertex();
/// v.add_label(dba.Label(
/// "Start")); // New SkipList is created in KeyIndex for LabelIndex.
/// // That SkipList creates SkipListGc which
/// // initialises static Executor object.
/// return 0;
/// }
///
/// After main exits: 1. Executor is destructed, 2. KeyIndex is destructed.
/// Destructor of KeyIndex calls delete on created SkipLists which destroy
/// SkipListGc that tries to use Excutioner object that doesn't exist anymore.
/// -> CRASH
class GraphDb {
public:
GraphDb() {}
GraphDb(const GraphDb &) = delete;
GraphDb(GraphDb &&) = delete;
GraphDb &operator=(const GraphDb &) = delete;
GraphDb &operator=(GraphDb &&) = delete;
virtual ~GraphDb() {}
/// Create a new accessor by starting a new transaction.
virtual std::unique_ptr<GraphDbAccessor> Access() = 0;
/// Create an accessor for a running transaction.
virtual std::unique_ptr<GraphDbAccessor> Access(tx::TransactionId) = 0;
virtual Storage &storage() = 0;
virtual durability::WriteAheadLog &wal() = 0;
virtual tx::Engine &tx_engine() = 0;
virtual storage::ConcurrentIdMapper<storage::Label> &label_mapper() = 0;
virtual storage::ConcurrentIdMapper<storage::EdgeType>
&edge_type_mapper() = 0;
virtual storage::ConcurrentIdMapper<storage::Property> &property_mapper() = 0;
virtual database::Counters &counters() = 0;
virtual void CollectGarbage() = 0;
/// Makes a snapshot from the visibility of the given accessor
virtual bool MakeSnapshot(GraphDbAccessor &accessor) = 0;
/// Releases the storage object safely and creates a new object.
/// This is needed because of recovery, otherwise we might try to recover into
/// a storage which has already been polluted because of a failed previous
/// recovery
virtual void ReinitializeStorage() = 0;
/// When this is false, no new transactions should be created.
bool is_accepting_transactions() const { return is_accepting_transactions_; }
protected:
std::atomic<bool> is_accepting_transactions_{true};
};
namespace impl {
class SingleNode;
} // namespace impl
class SingleNode final : public GraphDb {
public:
explicit SingleNode(Config config = Config());
~SingleNode();
std::unique_ptr<GraphDbAccessor> Access() override;
std::unique_ptr<GraphDbAccessor> Access(tx::TransactionId) override;
Storage &storage() override;
durability::WriteAheadLog &wal() override;
tx::Engine &tx_engine() override;
storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;
storage::ConcurrentIdMapper<storage::Property> &property_mapper() override;
database::Counters &counters() override;
void CollectGarbage() override;
bool MakeSnapshot(GraphDbAccessor &accessor) override;
void ReinitializeStorage() override;
private:
std::unique_ptr<impl::SingleNode> impl_;
std::unique_ptr<utils::Scheduler> snapshot_creator_;
utils::Scheduler transaction_killer_;
};
class SingleNodeRecoveryTransanctions final
: public durability::RecoveryTransactions {
public:
explicit SingleNodeRecoveryTransanctions(SingleNode *db);
~SingleNodeRecoveryTransanctions();
void Begin(const tx::TransactionId &tx_id) override;
void Abort(const tx::TransactionId &tx_id) override;
void Commit(const tx::TransactionId &tx_id) override;
void Apply(const database::StateDelta &delta) override;
private:
SingleNode *db_;
std::unordered_map<tx::TransactionId, std::unique_ptr<GraphDbAccessor>>
accessors_;
};
} // namespace database

View File

@ -1,28 +1,16 @@
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include <chrono>
#include <thread>
#include <glog/logging.h>
// TODO: THIS IS A HACK!
#ifdef MG_SINGLE_NODE
#include "durability/single_node/state_delta.hpp"
#include "storage/single_node/address_types.hpp"
#include "storage/single_node/edge.hpp"
#include "storage/single_node/edge_accessor.hpp"
#include "storage/single_node/vertex.hpp"
#include "storage/single_node/vertex_accessor.hpp"
#endif
#ifdef MG_DISTRIBUTED
#include "durability/distributed/state_delta.hpp"
#include "storage/distributed/address_types.hpp"
#include "storage/distributed/edge.hpp"
#include "storage/distributed/edge_accessor.hpp"
#include "storage/distributed/vertex.hpp"
#include "storage/distributed/vertex_accessor.hpp"
#endif
#include "utils/cast.hpp"
#include "utils/on_scope_exit.hpp"

View File

@ -0,0 +1,672 @@
/// @file
#pragma once
#include <experimental/optional>
#include <string>
#include <vector>
#include <glog/logging.h>
#include <cppitertools/filter.hpp>
#include <cppitertools/imap.hpp>
#include "database/distributed/graph_db.hpp"
#include "storage/common/types.hpp"
#include "storage/distributed/address_types.hpp"
#include "storage/distributed/edge_accessor.hpp"
#include "storage/distributed/vertex_accessor.hpp"
#include "transactions/transaction.hpp"
#include "transactions/type.hpp"
#include "utils/bound.hpp"
#include "utils/exceptions.hpp"
namespace database {
/** Thrown when creating an index which already exists. */
class IndexExistsException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/** Thrown when creating an index which already exists. */
class IndexCreationOnWorkerException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/**
* Base accessor for the database object: exposes functions for operating on the
* database. All the functions in this class should be self-sufficient: for
* example the function for creating a new Vertex should take care of all the
* book-keeping around the creation.
*/
class GraphDbAccessor {
// We need to make friends with this guys since they need to access private
// methods for updating indices.
// TODO: Rethink this, we have too much long-distance friendship complicating
// the code.
friend class ::RecordAccessor<Vertex>;
friend class ::VertexAccessor;
protected:
// Construction should only be done through GraphDb::Access function and
// concrete GraphDbAccessor type.
/// Creates a new accessor by starting a new transaction.
explicit GraphDbAccessor(GraphDb &db);
/// Creates an accessor for a running transaction.
GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id);
public:
virtual ~GraphDbAccessor();
GraphDbAccessor(const GraphDbAccessor &other) = delete;
GraphDbAccessor(GraphDbAccessor &&other) = delete;
GraphDbAccessor &operator=(const GraphDbAccessor &other) = delete;
GraphDbAccessor &operator=(GraphDbAccessor &&other) = delete;
virtual ::VertexAccessor::Impl *GetVertexImpl() = 0;
virtual ::RecordAccessor<Edge>::Impl *GetEdgeImpl() = 0;
/**
* Creates a new Vertex and returns an accessor to it. If the ID is
* provided, the created Vertex will have that local ID, and the ID counter
* will be increased to it so collisions are avoided. This should only be used
* by durability recovery, normal vertex creation should not provide the ID.
*
* You should NOT make interleaved recovery and normal DB op calls to this
* function. Doing so will likely mess up the ID generation and crash MG.
* Always perform recovery only once, immediately when the database is
* created, before any transactional ops start.
*
* @param requested_gid The requested GID. Should only be provided when
* recovering from durability.
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
*
* @return See above.
*/
VertexAccessor InsertVertex(std::experimental::optional<gid::Gid>
requested_gid = std::experimental::nullopt,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/**
* Removes the vertex of the given accessor. If the vertex has any outgoing or
* incoming edges, it is not deleted. See `DetachRemoveVertex` if you want to
* remove a vertex regardless of connectivity.
*
* If the vertex has already been deleted by the current transaction+command,
* this function will not do anything and will return true.
*
* @param vertex_accessor Accessor to vertex.
* @param check_empty If the vertex should be checked for existing edges
* before deletion.
* @return If or not the vertex was deleted.
*/
virtual bool RemoveVertex(VertexAccessor &vertex_accessor,
bool check_empty = true);
/**
* Removes the vertex of the given accessor along with all it's outgoing
* and incoming connections.
*
* @param vertex_accessor Accessor to a vertex.
*/
void DetachRemoveVertex(VertexAccessor &vertex_accessor);
/**
* Obtains the vertex for the given ID. If there is no vertex for the given
* ID, or it's not visible to this accessor's transaction, nullopt is
* returned.
*
* @param gid - The GID of the sought vertex.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
std::experimental::optional<VertexAccessor> FindVertexOptional(
gid::Gid gid, bool current_state);
/**
* Obtains the vertex for the given ID. If there is no vertex for the given
* ID, or it's not visible to this accessor's transaction, MG is crashed
* using a CHECK.
*
* @param gid - The GID of the sought vertex.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
VertexAccessor FindVertex(gid::Gid gid, bool current_state);
/**
* Returns iterable over accessors to all the vertices in the graph
* visible to the current transaction.
*
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
auto Vertices(bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// wrap version lists into accessors, which will look for visible versions
auto accessors = iter::imap(
[this](auto id_vlist) {
return VertexAccessor(storage::VertexAddress(id_vlist.second), *this);
},
db_.storage().vertices_.access());
// filter out the accessors not visible to the current transaction
return iter::filter(
[this, current_state](const VertexAccessor &accessor) {
return accessor.Visible(transaction(), current_state);
},
std::move(accessors));
}
/**
* Return VertexAccessors which contain the current label for the current
* transaction visibilty.
* @param label - label for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection
*/
auto Vertices(storage::Label label, bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().labels_index_.GetVlists(label, transaction_,
current_state));
}
/**
* Return VertexAccessors which contain the current label and property for the
* given transaction visibility.
*
* @param label - label for which to return VertexAccessors
* @param property - property for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection
*/
auto Vertices(storage::Label label, storage::Property property,
bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property)))
<< "Label+property index doesn't exist.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), transaction_,
current_state));
}
/**
* Return VertexAccessors which contain the current label + property, and
* those properties are equal to this 'value' for the given transaction
* visibility.
* @param label - label for which to return VertexAccessors
* @param property - property for which to return VertexAccessors
* @param value - property value for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection
*/
auto Vertices(storage::Label label, storage::Property property,
const PropertyValue &value, bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property)))
<< "Label+property index doesn't exist.";
CHECK(value.type() != PropertyValue::Type::Null)
<< "Can't query index for propery value type null.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), value, transaction_,
current_state));
}
/**
* Return an iterable over VertexAccessors which contain the
* given label and whose property value (for the given property)
* falls within the given (lower, upper) @c Bound.
*
* The returned iterator will only contain
* vertices/edges whose property value is comparable with the
* given bounds (w.r.t. type). This has implications on Cypher
* query execuction semantics which have not been resovled yet.
*
* At least one of the bounds must be specified. Bonds can't be
* @c PropertyValue::Null. If both bounds are
* specified, their PropertyValue elments must be of comparable
* types.
*
* @param label - label for which to return VertexAccessors
* @param property - property for which to return VertexAccessors
* @param lower - Lower bound of the interval.
* @param upper - Upper bound of the interval.
* @param value - property value for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection of record accessors
* satisfy the bounds and are visible to the current transaction.
*/
auto Vertices(
storage::Label label, storage::Property property,
const std::experimental::optional<utils::Bound<PropertyValue>> lower,
const std::experimental::optional<utils::Bound<PropertyValue>> upper,
bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property)))
<< "Label+property index doesn't exist.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), lower, upper,
transaction_, current_state));
}
/**
* Creates a new Edge and returns an accessor to it. If the ID is
* provided, the created Edge will have that ID, and the ID counter will be
* increased to it so collisions are avoided. This should only be used by
* durability recovery, normal edge creation should not provide the ID.
*
* You should NOT make interleaved recovery and normal DB op calls to this
* function. Doing so will likely mess up the ID generation and crash MG.
* Always perform recovery only once, immediately when the database is
* created, before any transactional ops start.
*
* @param from The 'from' vertex.
* @param to The 'to' vertex'
* @param type Edge type.
* @param requested_gid The requested GID. Should only be provided when
* recovering from durability.
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
*
* @return An accessor to the edge.
*/
EdgeAccessor InsertEdge(VertexAccessor &from, VertexAccessor &to,
storage::EdgeType type,
std::experimental::optional<gid::Gid> requested_gid =
std::experimental::nullopt,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/**
* Insert edge into main storage, but don't insert it into from and to
* vertices edge lists.
*
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
*/
EdgeAccessor InsertOnlyEdge(storage::VertexAddress from,
storage::VertexAddress to,
storage::EdgeType edge_type,
std::experimental::optional<gid::Gid>
requested_gid = std::experimental::nullopt,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/**
* Removes an edge from the graph. Parameters can indicate if the edge should
* be removed from data structures in vertices it connects. When removing an
* edge both arguments should be `true`. `false` is only used when
* detach-deleting a vertex.
*
* @param edge The accessor to an edge.
* @param remove_out_edge If the edge should be removed from the its origin
* side.
* @param remove_in_edge If the edge should be removed from the its
* destination side.
*/
virtual void RemoveEdge(EdgeAccessor &edge, bool remove_out_edge = true,
bool remove_in_edge = true);
/**
* Obtains the edge for the given ID. If there is no edge for the given
* ID, or it's not visible to this accessor's transaction, nullopt is
* returned.
*
* @param gid - The GID of the sought edge.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
std::experimental::optional<EdgeAccessor> FindEdgeOptional(
gid::Gid gid, bool current_state);
/**
* Obtains the edge for the given ID. If there is no edge for the given
* ID, or it's not visible to this accessor's transaction, MG is crashed
* using a CHECK.
*
* @param gid - The GID of the sought edge.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
EdgeAccessor FindEdge(gid::Gid gid, bool current_state);
/**
* Returns iterable over accessors to all the edges in the graph
* visible to the current transaction.
*
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
auto Edges(bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// wrap version lists into accessors, which will look for visible versions
auto accessors = iter::imap(
[this](auto id_vlist) {
return EdgeAccessor(storage::EdgeAddress(id_vlist.second), *this);
},
db_.storage().edges_.access());
// filter out the accessors not visible to the current transaction
return iter::filter(
[this, current_state](const EdgeAccessor &accessor) {
return accessor.Visible(transaction(), current_state);
},
std::move(accessors));
}
/**
* Creates and returns a new accessor that represents the same graph element
* (node / version) as the given `accessor`, but in this `GraphDbAccessor`.
*
* It is possible that the given `accessor` graph element is not visible in
* this `GraphDbAccessor`'s transaction. If that is the case, a `nullopt` is
* returned.
*
* The returned accessor does NOT have the same `current_` set as the given
* `accessor`. It has default post-construction `current_` set (`old` if
* available, otherwise `new`).
*
* @param accessor The [Vertex/Edge]Accessor whose underlying graph element we
* want in this GraphDbAccessor.
* @return See above.
* @tparam TAccessor Either VertexAccessor or EdgeAccessor
*/
template <typename TAccessor>
std::experimental::optional<TAccessor> Transfer(const TAccessor &accessor) {
if (accessor.db_accessor_ == this)
return std::experimental::make_optional(accessor);
TAccessor accessor_in_this(accessor.address(), *this);
if (accessor_in_this.current_)
return std::experimental::make_optional(std::move(accessor_in_this));
else
return std::experimental::nullopt;
}
/**
* Adds an index for the given (label, property) and populates it with
* existing vertices that belong to it.
*
* You should never call BuildIndex on a GraphDbAccessor (transaction) on
* which new vertices have been inserted or existing ones updated. Do it
* in a new accessor instead.
*
* Build index throws if an index for the given (label, property) already
* exists (even if it's being built by a concurrent transaction and is not yet
* ready for use).
*
* It also throws if there is another index being built concurrently on the
* same database this accessor is for.
*
* @param label - label to build for
* @param property - property to build for
*/
virtual void BuildIndex(storage::Label label, storage::Property property);
/// Populates index with vertices containing the key
void PopulateIndex(const LabelPropertyIndex::Key &key);
/// Writes Index (key) creation to wal, marks it as ready for usage
void EnableIndex(const LabelPropertyIndex::Key &key);
/**
* @brief - Returns true if the given label+property index already exists and
* is ready for use.
*/
bool LabelPropertyIndexExists(storage::Label label,
storage::Property property) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property));
}
/**
* @brief - Returns vector of keys of label-property indices.
*/
std::vector<LabelPropertyIndex::Key> GetIndicesKeys() {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.storage().label_property_index_.Keys();
}
/**
* Return approximate number of all vertices in the database.
* Note that this is always an over-estimate and never an under-estimate.
*/
int64_t VerticesCount() const;
/*
* Return approximate number of all edges in the database.
* Note that this is always an over-estimate and never an under-estimate.
*/
int64_t EdgesCount() const;
/**
* Return approximate number of vertices under indexes with the given label.
* Note that this is always an over-estimate and never an under-estimate.
*
* @param label - label to check for
* @return number of vertices with the given label
*/
int64_t VerticesCount(storage::Label label) const;
/**
* Return approximate number of vertices under indexes with the given label
* and property. Note that this is always an over-estimate and never an
* under-estimate.
*
* @param label - label to check for
* @param property - property to check for
* @return number of vertices with the given label, fails if no such
* label+property index exists.
*/
int64_t VerticesCount(storage::Label label, storage::Property property) const;
/**
* Returns approximate number of vertices that have the given label
* and the given value for the given property.
*
* Assumes that an index for that (label, property) exists.
*/
int64_t VerticesCount(storage::Label label, storage::Property property,
const PropertyValue &value) const;
/**
* Returns approximate number of vertices that have the given label
* and whose vaue is in the range defined by upper and lower @c Bound.
*
* At least one bound must be specified. Neither can be
* PropertyValue::Null.
*
* Assumes that an index for that (label, property) exists.
*/
int64_t VerticesCount(
storage::Label label, storage::Property property,
const std::experimental::optional<utils::Bound<PropertyValue>> lower,
const std::experimental::optional<utils::Bound<PropertyValue>> upper)
const;
/**
* Obtains the Label for the label's name.
* @return See above.
*/
storage::Label Label(const std::string &label_name);
/**
* Obtains the label name (a string) for the given label.
*
* @param label a Label.
* @return See above.
*/
const std::string &LabelName(storage::Label label) const;
/**
* Obtains the EdgeType for it's name.
* @return See above.
*/
storage::EdgeType EdgeType(const std::string &edge_type_name);
/**
* Obtains the edge type name (a string) for the given edge type.
*
* @param edge_type an EdgeType.
* @return See above.
*/
const std::string &EdgeTypeName(storage::EdgeType edge_type) const;
/**
* Obtains the Property for it's name.
* @return See above.
*/
storage::Property Property(const std::string &property_name);
/**
* Obtains the property name (a string) for the given property.
*
* @param property a Property.
* @return See above.
*/
const std::string &PropertyName(storage::Property property) const;
/** Returns the id of this accessor's transaction */
tx::TransactionId transaction_id() const;
/** Advances transaction's command id by 1. */
virtual void AdvanceCommand();
/** Commit transaction. */
void Commit();
/** Abort transaction. */
void Abort();
/** Return true if transaction is hinted to abort. */
bool should_abort() const;
const tx::Transaction &transaction() const { return transaction_; }
durability::WriteAheadLog &wal();
auto &db() { return db_; }
const auto &db() const { return db_; }
/**
* Returns the current value of the counter with the given name, and
* increments that counter. If the counter with the given name does not exist,
* a new counter is created and this function returns 0.
*/
int64_t Counter(const std::string &name);
/**
* Sets the counter with the given name to the given value. Returns nothing.
* If the counter with the given name does not exist, a new counter is created
* and set to the given value.
*/
void CounterSet(const std::string &name, int64_t value);
/* Returns a list of index names present in the database. */
std::vector<std::string> IndexInfo() const;
/**
* Insert this vertex into corresponding label and label+property (if it
* exists) index.
*
* @param label - label with which to insert vertex label record
* @param vertex_accessor - vertex_accessor to insert
* @param vertex - vertex record to insert
*/
void UpdateLabelIndices(storage::Label label,
const VertexAccessor &vertex_accessor,
const Vertex *const vertex);
protected:
/** Called in `BuildIndex` after creating an index, but before populating. */
virtual void PostCreateIndex(const LabelPropertyIndex::Key &key) {}
/** Populates the index from a *new* transaction after creating the index. */
virtual void PopulateIndexFromBuildIndex(const LabelPropertyIndex::Key &key) {
PopulateIndex(key);
}
/**
* Insert a new edge to `from` vertex and return the address.
* Called from `InsertEdge` as the first step in edge insertion.
* */
virtual storage::EdgeAddress InsertEdgeOnFrom(
VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const std::experimental::optional<gid::Gid> &requested_gid,
const std::experimental::optional<int64_t> &cypher_id);
/**
* Set the newly created edge on `to` vertex.
* Called after `InsertEdgeOnFrom` in `InsertEdge`. The given `edge_address`
* is from the created edge, returned by `InsertEdgeOnFrom`.
*/
virtual void InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const storage::EdgeAddress &edge_address);
private:
GraphDb &db_;
tx::Transaction &transaction_;
// Indicates if this db-accessor started the transaction and should Abort it
// upon destruction.
bool transaction_starter_;
bool commited_{false};
bool aborted_{false};
/**
* Insert this vertex into corresponding any label + 'property' index.
* @param property - vertex will be inserted into indexes which contain this
* property
* @param vertex_accessor - vertex accessor to insert
* @param vertex - vertex to insert
*/
void UpdatePropertyIndex(storage::Property property,
const RecordAccessor<Vertex> &vertex_accessor,
const Vertex *const vertex);
};
} // namespace database

View File

@ -1,7 +1,7 @@
#>cpp
#pragma once
#include "database/serialization.capnp.h"
#include "database/distributed/serialization.capnp.h"
#include "durability/distributed/state_delta.hpp"
#include "storage/distributed/serialization.hpp"
cpp<#

View File

@ -1,176 +1,8 @@
/// @file
#pragma once
#include <atomic>
#include <memory>
#include <vector>
#include "database/counters.hpp"
#include "io/network/endpoint.hpp"
#include "storage/common/concurrent_id_mapper.hpp"
#include "transactions/engine.hpp"
#include "utils/scheduler.hpp"
// TODO: THIS IS A HACK!
#ifdef MG_SINGLE_NODE
#include "durability/single_node/recovery.hpp"
#include "durability/single_node/wal.hpp"
#include "storage/common/types.hpp"
#include "storage/single_node/storage.hpp"
#include "storage/single_node/storage_gc.hpp"
#include "database/single_node/graph_db.hpp"
#endif
#ifdef MG_DISTRIBUTED
#include "durability/distributed/recovery.hpp"
#include "durability/distributed/wal.hpp"
#include "storage/common/types.hpp"
#include "storage/distributed/storage.hpp"
#include "storage/distributed/storage_gc.hpp"
#include "storage/distributed/vertex_accessor.hpp"
#include "database/distributed/graph_db.hpp"
#endif
namespace database {
/// Database configuration. Initialized from flags, but modifiable.
struct Config {
Config();
// Durability flags.
bool durability_enabled;
std::string durability_directory;
bool db_recover_on_startup;
int snapshot_cycle_sec;
int snapshot_max_retained;
int snapshot_on_exit;
bool synchronous_commit;
// Misc flags.
int gc_cycle_sec;
int query_execution_time_sec;
// set of properties which will be stored on disk
std::vector<std::string> properties_on_disk;
// Distributed master/worker flags.
bool dynamic_graph_partitioner_enabled{false};
int rpc_num_client_workers{0};
int rpc_num_server_workers{0};
int worker_id{0};
io::network::Endpoint master_endpoint{"0.0.0.0", 0};
io::network::Endpoint worker_endpoint{"0.0.0.0", 0};
int recovering_cluster_size{0};
};
class GraphDbAccessor;
/// An abstract base class providing the interface for a graph database.
///
/// Always be sure that GraphDb object is destructed before main exits, i. e.
/// GraphDb object shouldn't be part of global/static variable, except if its
/// destructor is explicitly called before main exits. Consider code:
///
/// GraphDb db; // KeyIndex is created as a part of database::Storage
/// int main() {
/// GraphDbAccessor dba(db);
/// auto v = dba.InsertVertex();
/// v.add_label(dba.Label(
/// "Start")); // New SkipList is created in KeyIndex for LabelIndex.
/// // That SkipList creates SkipListGc which
/// // initialises static Executor object.
/// return 0;
/// }
///
/// After main exits: 1. Executor is destructed, 2. KeyIndex is destructed.
/// Destructor of KeyIndex calls delete on created SkipLists which destroy
/// SkipListGc that tries to use Excutioner object that doesn't exist anymore.
/// -> CRASH
class GraphDb {
public:
GraphDb() {}
GraphDb(const GraphDb &) = delete;
GraphDb(GraphDb &&) = delete;
GraphDb &operator=(const GraphDb &) = delete;
GraphDb &operator=(GraphDb &&) = delete;
virtual ~GraphDb() {}
/// Create a new accessor by starting a new transaction.
virtual std::unique_ptr<GraphDbAccessor> Access() = 0;
/// Create an accessor for a running transaction.
virtual std::unique_ptr<GraphDbAccessor> Access(tx::TransactionId) = 0;
virtual Storage &storage() = 0;
virtual durability::WriteAheadLog &wal() = 0;
virtual tx::Engine &tx_engine() = 0;
virtual storage::ConcurrentIdMapper<storage::Label> &label_mapper() = 0;
virtual storage::ConcurrentIdMapper<storage::EdgeType>
&edge_type_mapper() = 0;
virtual storage::ConcurrentIdMapper<storage::Property> &property_mapper() = 0;
virtual database::Counters &counters() = 0;
virtual void CollectGarbage() = 0;
/// Makes a snapshot from the visibility of the given accessor
virtual bool MakeSnapshot(GraphDbAccessor &accessor) = 0;
/// Releases the storage object safely and creates a new object.
/// This is needed because of recovery, otherwise we might try to recover into
/// a storage which has already been polluted because of a failed previous
/// recovery
virtual void ReinitializeStorage() = 0;
/// When this is false, no new transactions should be created.
bool is_accepting_transactions() const { return is_accepting_transactions_; }
protected:
std::atomic<bool> is_accepting_transactions_{true};
};
namespace impl {
class SingleNode;
} // namespace impl
class SingleNode final : public GraphDb {
public:
explicit SingleNode(Config config = Config());
~SingleNode();
std::unique_ptr<GraphDbAccessor> Access() override;
std::unique_ptr<GraphDbAccessor> Access(tx::TransactionId) override;
Storage &storage() override;
durability::WriteAheadLog &wal() override;
tx::Engine &tx_engine() override;
storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;
storage::ConcurrentIdMapper<storage::Property> &property_mapper() override;
database::Counters &counters() override;
void CollectGarbage() override;
bool MakeSnapshot(GraphDbAccessor &accessor) override;
void ReinitializeStorage() override;
private:
std::unique_ptr<impl::SingleNode> impl_;
std::unique_ptr<utils::Scheduler> snapshot_creator_;
utils::Scheduler transaction_killer_;
};
class SingleNodeRecoveryTransanctions final
: public durability::RecoveryTransactions {
public:
explicit SingleNodeRecoveryTransanctions(SingleNode *db);
~SingleNodeRecoveryTransanctions();
void Begin(const tx::TransactionId &tx_id) override;
void Abort(const tx::TransactionId &tx_id) override;
void Commit(const tx::TransactionId &tx_id) override;
void Apply(const database::StateDelta &delta) override;
private:
SingleNode *db_;
std::unordered_map<tx::TransactionId, std::unique_ptr<GraphDbAccessor>>
accessors_;
};
} // namespace database

View File

@ -1,682 +1,8 @@
/// @file
#pragma once
#include <experimental/optional>
#include <string>
#include <vector>
#include <glog/logging.h>
#include <cppitertools/filter.hpp>
#include <cppitertools/imap.hpp>
#include "database/graph_db.hpp"
#include "transactions/transaction.hpp"
#include "transactions/type.hpp"
#include "utils/bound.hpp"
#include "utils/exceptions.hpp"
// TODO: THIS IS A HACK!
#ifdef MG_SINGLE_NODE
#include "storage/common/types.hpp"
#include "storage/single_node/address_types.hpp"
#include "storage/single_node/edge_accessor.hpp"
#include "storage/single_node/vertex_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#endif
#ifdef MG_DISTRIBUTED
#include "storage/common/types.hpp"
#include "storage/distributed/address_types.hpp"
#include "storage/distributed/edge_accessor.hpp"
#include "storage/distributed/vertex_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#endif
namespace database {
/** Thrown when creating an index which already exists. */
class IndexExistsException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/** Thrown when creating an index which already exists. */
class IndexCreationOnWorkerException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/**
* Base accessor for the database object: exposes functions for operating on the
* database. All the functions in this class should be self-sufficient: for
* example the function for creating a new Vertex should take care of all the
* book-keeping around the creation.
*/
class GraphDbAccessor {
// We need to make friends with this guys since they need to access private
// methods for updating indices.
// TODO: Rethink this, we have too much long-distance friendship complicating
// the code.
friend class ::RecordAccessor<Vertex>;
friend class ::VertexAccessor;
protected:
// Construction should only be done through GraphDb::Access function and
// concrete GraphDbAccessor type.
/// Creates a new accessor by starting a new transaction.
explicit GraphDbAccessor(GraphDb &db);
/// Creates an accessor for a running transaction.
GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id);
public:
virtual ~GraphDbAccessor();
GraphDbAccessor(const GraphDbAccessor &other) = delete;
GraphDbAccessor(GraphDbAccessor &&other) = delete;
GraphDbAccessor &operator=(const GraphDbAccessor &other) = delete;
GraphDbAccessor &operator=(GraphDbAccessor &&other) = delete;
virtual ::VertexAccessor::Impl *GetVertexImpl() = 0;
virtual ::RecordAccessor<Edge>::Impl *GetEdgeImpl() = 0;
/**
* Creates a new Vertex and returns an accessor to it. If the ID is
* provided, the created Vertex will have that local ID, and the ID counter
* will be increased to it so collisions are avoided. This should only be used
* by durability recovery, normal vertex creation should not provide the ID.
*
* You should NOT make interleaved recovery and normal DB op calls to this
* function. Doing so will likely mess up the ID generation and crash MG.
* Always perform recovery only once, immediately when the database is
* created, before any transactional ops start.
*
* @param requested_gid The requested GID. Should only be provided when
* recovering from durability.
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
*
* @return See above.
*/
VertexAccessor InsertVertex(std::experimental::optional<gid::Gid>
requested_gid = std::experimental::nullopt,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/**
* Removes the vertex of the given accessor. If the vertex has any outgoing or
* incoming edges, it is not deleted. See `DetachRemoveVertex` if you want to
* remove a vertex regardless of connectivity.
*
* If the vertex has already been deleted by the current transaction+command,
* this function will not do anything and will return true.
*
* @param vertex_accessor Accessor to vertex.
* @param check_empty If the vertex should be checked for existing edges
* before deletion.
* @return If or not the vertex was deleted.
*/
virtual bool RemoveVertex(VertexAccessor &vertex_accessor,
bool check_empty = true);
/**
* Removes the vertex of the given accessor along with all it's outgoing
* and incoming connections.
*
* @param vertex_accessor Accessor to a vertex.
*/
void DetachRemoveVertex(VertexAccessor &vertex_accessor);
/**
* Obtains the vertex for the given ID. If there is no vertex for the given
* ID, or it's not visible to this accessor's transaction, nullopt is
* returned.
*
* @param gid - The GID of the sought vertex.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
std::experimental::optional<VertexAccessor> FindVertexOptional(
gid::Gid gid, bool current_state);
/**
* Obtains the vertex for the given ID. If there is no vertex for the given
* ID, or it's not visible to this accessor's transaction, MG is crashed
* using a CHECK.
*
* @param gid - The GID of the sought vertex.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
VertexAccessor FindVertex(gid::Gid gid, bool current_state);
/**
* Returns iterable over accessors to all the vertices in the graph
* visible to the current transaction.
*
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
auto Vertices(bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// wrap version lists into accessors, which will look for visible versions
auto accessors = iter::imap(
[this](auto id_vlist) {
return VertexAccessor(storage::VertexAddress(id_vlist.second), *this);
},
db_.storage().vertices_.access());
// filter out the accessors not visible to the current transaction
return iter::filter(
[this, current_state](const VertexAccessor &accessor) {
return accessor.Visible(transaction(), current_state);
},
std::move(accessors));
}
/**
* Return VertexAccessors which contain the current label for the current
* transaction visibilty.
* @param label - label for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection
*/
auto Vertices(storage::Label label, bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().labels_index_.GetVlists(label, transaction_,
current_state));
}
/**
* Return VertexAccessors which contain the current label and property for the
* given transaction visibility.
*
* @param label - label for which to return VertexAccessors
* @param property - property for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection
*/
auto Vertices(storage::Label label, storage::Property property,
bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property)))
<< "Label+property index doesn't exist.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), transaction_,
current_state));
}
/**
* Return VertexAccessors which contain the current label + property, and
* those properties are equal to this 'value' for the given transaction
* visibility.
* @param label - label for which to return VertexAccessors
* @param property - property for which to return VertexAccessors
* @param value - property value for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection
*/
auto Vertices(storage::Label label, storage::Property property,
const PropertyValue &value, bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property)))
<< "Label+property index doesn't exist.";
CHECK(value.type() != PropertyValue::Type::Null)
<< "Can't query index for propery value type null.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), value, transaction_,
current_state));
}
/**
* Return an iterable over VertexAccessors which contain the
* given label and whose property value (for the given property)
* falls within the given (lower, upper) @c Bound.
*
* The returned iterator will only contain
* vertices/edges whose property value is comparable with the
* given bounds (w.r.t. type). This has implications on Cypher
* query execuction semantics which have not been resovled yet.
*
* At least one of the bounds must be specified. Bonds can't be
* @c PropertyValue::Null. If both bounds are
* specified, their PropertyValue elments must be of comparable
* types.
*
* @param label - label for which to return VertexAccessors
* @param property - property for which to return VertexAccessors
* @param lower - Lower bound of the interval.
* @param upper - Upper bound of the interval.
* @param value - property value for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection of record accessors
* satisfy the bounds and are visible to the current transaction.
*/
auto Vertices(
storage::Label label, storage::Property property,
const std::experimental::optional<utils::Bound<PropertyValue>> lower,
const std::experimental::optional<utils::Bound<PropertyValue>> upper,
bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property)))
<< "Label+property index doesn't exist.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), lower, upper,
transaction_, current_state));
}
/**
* Creates a new Edge and returns an accessor to it. If the ID is
* provided, the created Edge will have that ID, and the ID counter will be
* increased to it so collisions are avoided. This should only be used by
* durability recovery, normal edge creation should not provide the ID.
*
* You should NOT make interleaved recovery and normal DB op calls to this
* function. Doing so will likely mess up the ID generation and crash MG.
* Always perform recovery only once, immediately when the database is
* created, before any transactional ops start.
*
* @param from The 'from' vertex.
* @param to The 'to' vertex'
* @param type Edge type.
* @param requested_gid The requested GID. Should only be provided when
* recovering from durability.
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
*
* @return An accessor to the edge.
*/
EdgeAccessor InsertEdge(VertexAccessor &from, VertexAccessor &to,
storage::EdgeType type,
std::experimental::optional<gid::Gid> requested_gid =
std::experimental::nullopt,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/**
* Insert edge into main storage, but don't insert it into from and to
* vertices edge lists.
*
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
*/
EdgeAccessor InsertOnlyEdge(storage::VertexAddress from,
storage::VertexAddress to,
storage::EdgeType edge_type,
std::experimental::optional<gid::Gid>
requested_gid = std::experimental::nullopt,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/**
* Removes an edge from the graph. Parameters can indicate if the edge should
* be removed from data structures in vertices it connects. When removing an
* edge both arguments should be `true`. `false` is only used when
* detach-deleting a vertex.
*
* @param edge The accessor to an edge.
* @param remove_out_edge If the edge should be removed from the its origin
* side.
* @param remove_in_edge If the edge should be removed from the its
* destination side.
*/
virtual void RemoveEdge(EdgeAccessor &edge, bool remove_out_edge = true,
bool remove_in_edge = true);
/**
* Obtains the edge for the given ID. If there is no edge for the given
* ID, or it's not visible to this accessor's transaction, nullopt is
* returned.
*
* @param gid - The GID of the sought edge.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
std::experimental::optional<EdgeAccessor> FindEdgeOptional(
gid::Gid gid, bool current_state);
/**
* Obtains the edge for the given ID. If there is no edge for the given
* ID, or it's not visible to this accessor's transaction, MG is crashed
* using a CHECK.
*
* @param gid - The GID of the sought edge.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
EdgeAccessor FindEdge(gid::Gid gid, bool current_state);
/**
* Returns iterable over accessors to all the edges in the graph
* visible to the current transaction.
*
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
auto Edges(bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// wrap version lists into accessors, which will look for visible versions
auto accessors = iter::imap(
[this](auto id_vlist) {
return EdgeAccessor(storage::EdgeAddress(id_vlist.second), *this);
},
db_.storage().edges_.access());
// filter out the accessors not visible to the current transaction
return iter::filter(
[this, current_state](const EdgeAccessor &accessor) {
return accessor.Visible(transaction(), current_state);
},
std::move(accessors));
}
/**
* Creates and returns a new accessor that represents the same graph element
* (node / version) as the given `accessor`, but in this `GraphDbAccessor`.
*
* It is possible that the given `accessor` graph element is not visible in
* this `GraphDbAccessor`'s transaction. If that is the case, a `nullopt` is
* returned.
*
* The returned accessor does NOT have the same `current_` set as the given
* `accessor`. It has default post-construction `current_` set (`old` if
* available, otherwise `new`).
*
* @param accessor The [Vertex/Edge]Accessor whose underlying graph element we
* want in this GraphDbAccessor.
* @return See above.
* @tparam TAccessor Either VertexAccessor or EdgeAccessor
*/
template <typename TAccessor>
std::experimental::optional<TAccessor> Transfer(const TAccessor &accessor) {
if (accessor.db_accessor_ == this)
return std::experimental::make_optional(accessor);
TAccessor accessor_in_this(accessor.address(), *this);
if (accessor_in_this.current_)
return std::experimental::make_optional(std::move(accessor_in_this));
else
return std::experimental::nullopt;
}
/**
* Adds an index for the given (label, property) and populates it with
* existing vertices that belong to it.
*
* You should never call BuildIndex on a GraphDbAccessor (transaction) on
* which new vertices have been inserted or existing ones updated. Do it
* in a new accessor instead.
*
* Build index throws if an index for the given (label, property) already
* exists (even if it's being built by a concurrent transaction and is not yet
* ready for use).
*
* It also throws if there is another index being built concurrently on the
* same database this accessor is for.
*
* @param label - label to build for
* @param property - property to build for
*/
virtual void BuildIndex(storage::Label label, storage::Property property);
/// Populates index with vertices containing the key
void PopulateIndex(const LabelPropertyIndex::Key &key);
/// Writes Index (key) creation to wal, marks it as ready for usage
void EnableIndex(const LabelPropertyIndex::Key &key);
/**
* @brief - Returns true if the given label+property index already exists and
* is ready for use.
*/
bool LabelPropertyIndexExists(storage::Label label,
storage::Property property) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property));
}
/**
* @brief - Returns vector of keys of label-property indices.
*/
std::vector<LabelPropertyIndex::Key> GetIndicesKeys() {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.storage().label_property_index_.Keys();
}
/**
* Return approximate number of all vertices in the database.
* Note that this is always an over-estimate and never an under-estimate.
*/
int64_t VerticesCount() const;
/*
* Return approximate number of all edges in the database.
* Note that this is always an over-estimate and never an under-estimate.
*/
int64_t EdgesCount() const;
/**
* Return approximate number of vertices under indexes with the given label.
* Note that this is always an over-estimate and never an under-estimate.
*
* @param label - label to check for
* @return number of vertices with the given label
*/
int64_t VerticesCount(storage::Label label) const;
/**
* Return approximate number of vertices under indexes with the given label
* and property. Note that this is always an over-estimate and never an
* under-estimate.
*
* @param label - label to check for
* @param property - property to check for
* @return number of vertices with the given label, fails if no such
* label+property index exists.
*/
int64_t VerticesCount(storage::Label label, storage::Property property) const;
/**
* Returns approximate number of vertices that have the given label
* and the given value for the given property.
*
* Assumes that an index for that (label, property) exists.
*/
int64_t VerticesCount(storage::Label label, storage::Property property,
const PropertyValue &value) const;
/**
* Returns approximate number of vertices that have the given label
* and whose vaue is in the range defined by upper and lower @c Bound.
*
* At least one bound must be specified. Neither can be
* PropertyValue::Null.
*
* Assumes that an index for that (label, property) exists.
*/
int64_t VerticesCount(
storage::Label label, storage::Property property,
const std::experimental::optional<utils::Bound<PropertyValue>> lower,
const std::experimental::optional<utils::Bound<PropertyValue>> upper)
const;
/**
* Obtains the Label for the label's name.
* @return See above.
*/
storage::Label Label(const std::string &label_name);
/**
* Obtains the label name (a string) for the given label.
*
* @param label a Label.
* @return See above.
*/
const std::string &LabelName(storage::Label label) const;
/**
* Obtains the EdgeType for it's name.
* @return See above.
*/
storage::EdgeType EdgeType(const std::string &edge_type_name);
/**
* Obtains the edge type name (a string) for the given edge type.
*
* @param edge_type an EdgeType.
* @return See above.
*/
const std::string &EdgeTypeName(storage::EdgeType edge_type) const;
/**
* Obtains the Property for it's name.
* @return See above.
*/
storage::Property Property(const std::string &property_name);
/**
* Obtains the property name (a string) for the given property.
*
* @param property a Property.
* @return See above.
*/
const std::string &PropertyName(storage::Property property) const;
/** Returns the id of this accessor's transaction */
tx::TransactionId transaction_id() const;
/** Advances transaction's command id by 1. */
virtual void AdvanceCommand();
/** Commit transaction. */
void Commit();
/** Abort transaction. */
void Abort();
/** Return true if transaction is hinted to abort. */
bool should_abort() const;
const tx::Transaction &transaction() const { return transaction_; }
durability::WriteAheadLog &wal();
auto &db() { return db_; }
const auto &db() const { return db_; }
/**
* Returns the current value of the counter with the given name, and
* increments that counter. If the counter with the given name does not exist,
* a new counter is created and this function returns 0.
*/
int64_t Counter(const std::string &name);
/**
* Sets the counter with the given name to the given value. Returns nothing.
* If the counter with the given name does not exist, a new counter is created
* and set to the given value.
*/
void CounterSet(const std::string &name, int64_t value);
/* Returns a list of index names present in the database. */
std::vector<std::string> IndexInfo() const;
/**
* Insert this vertex into corresponding label and label+property (if it
* exists) index.
*
* @param label - label with which to insert vertex label record
* @param vertex_accessor - vertex_accessor to insert
* @param vertex - vertex record to insert
*/
void UpdateLabelIndices(storage::Label label,
const VertexAccessor &vertex_accessor,
const Vertex *const vertex);
protected:
/** Called in `BuildIndex` after creating an index, but before populating. */
virtual void PostCreateIndex(const LabelPropertyIndex::Key &key) {}
/** Populates the index from a *new* transaction after creating the index. */
virtual void PopulateIndexFromBuildIndex(const LabelPropertyIndex::Key &key) {
PopulateIndex(key);
}
/**
* Insert a new edge to `from` vertex and return the address.
* Called from `InsertEdge` as the first step in edge insertion.
* */
virtual storage::EdgeAddress InsertEdgeOnFrom(
VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const std::experimental::optional<gid::Gid> &requested_gid,
const std::experimental::optional<int64_t> &cypher_id);
/**
* Set the newly created edge on `to` vertex.
* Called after `InsertEdgeOnFrom` in `InsertEdge`. The given `edge_address`
* is from the created edge, returned by `InsertEdgeOnFrom`.
*/
virtual void InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const storage::EdgeAddress &edge_address);
private:
GraphDb &db_;
tx::Transaction &transaction_;
// Indicates if this db-accessor started the transaction and should Abort it
// upon destruction.
bool transaction_starter_;
bool commited_{false};
bool aborted_{false};
/**
* Insert this vertex into corresponding any label + 'property' index.
* @param property - vertex will be inserted into indexes which contain this
* property
* @param vertex_accessor - vertex accessor to insert
* @param vertex - vertex to insert
*/
void UpdatePropertyIndex(storage::Property property,
const RecordAccessor<Vertex> &vertex_accessor,
const Vertex *const vertex);
};
} // namespace database

View File

@ -0,0 +1,52 @@
#include <limits>
#include "database/single_node/graph_db.hpp"
#include "utils/flag_validation.hpp"
#include "utils/string.hpp"
// Durability flags.
DEFINE_bool(durability_enabled, false,
"If durability (database persistence) should be enabled");
DEFINE_string(
durability_directory, "durability",
"Path to directory in which to save snapshots and write-ahead log files.");
DEFINE_bool(db_recover_on_startup, false, "Recover database on startup.");
DEFINE_VALIDATED_int32(
snapshot_cycle_sec, 3600,
"Amount of time between two snapshots, in seconds (min 60).",
FLAG_IN_RANGE(1, std::numeric_limits<int32_t>::max()));
DEFINE_int32(snapshot_max_retained, -1,
"Number of retained snapshots, -1 means without limit.");
DEFINE_bool(snapshot_on_exit, false, "Snapshot on exiting the database.");
// Misc flags
DEFINE_int32(query_execution_time_sec, 180,
"Maximum allowed query execution time. Queries exceeding this "
"limit will be aborted. Value of -1 means no limit.");
DEFINE_int32(gc_cycle_sec, 30,
"Amount of time between starts of two cleaning cycles in seconds. "
"-1 to turn off.");
// Data location.
DEFINE_string(properties_on_disk, "",
"Property names of properties which will be stored on available "
"disk. Property names have to be separated with comma (,).");
// Full durability.
DEFINE_bool(synchronous_commit, false,
"Should a transaction end wait for WAL records to be written to "
"disk before the transaction finishes.");
database::Config::Config()
// Durability flags.
: durability_enabled{FLAGS_durability_enabled},
durability_directory{FLAGS_durability_directory},
db_recover_on_startup{FLAGS_db_recover_on_startup},
snapshot_cycle_sec{FLAGS_snapshot_cycle_sec},
snapshot_max_retained{FLAGS_snapshot_max_retained},
snapshot_on_exit{FLAGS_snapshot_on_exit},
synchronous_commit{FLAGS_synchronous_commit},
// Misc flags.
gc_cycle_sec{FLAGS_gc_cycle_sec},
query_execution_time_sec{FLAGS_query_execution_time_sec},
// Data location.
properties_on_disk(utils::Split(FLAGS_properties_on_disk, ",")) {}

View File

@ -1,11 +1,11 @@
#include "database/graph_db.hpp"
#include "database/single_node/graph_db.hpp"
#include <experimental/optional>
#include <glog/logging.h>
#include "database/graph_db_accessor.hpp"
#include "database/single_node_counters.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "database/single_node/single_node_counters.hpp"
#include "durability/paths.hpp"
#include "durability/single_node/recovery.hpp"
#include "durability/single_node/snapshooter.hpp"

View File

@ -0,0 +1,163 @@
/// @file
#pragma once
#include <atomic>
#include <memory>
#include <vector>
#include "database/counters.hpp"
#include "durability/single_node/recovery.hpp"
#include "durability/single_node/wal.hpp"
#include "io/network/endpoint.hpp"
#include "storage/common/concurrent_id_mapper.hpp"
#include "storage/common/types.hpp"
#include "storage/single_node/storage.hpp"
#include "storage/single_node/storage_gc.hpp"
#include "transactions/engine.hpp"
#include "utils/scheduler.hpp"
namespace database {
/// Database configuration. Initialized from flags, but modifiable.
struct Config {
Config();
// Durability flags.
bool durability_enabled;
std::string durability_directory;
bool db_recover_on_startup;
int snapshot_cycle_sec;
int snapshot_max_retained;
int snapshot_on_exit;
bool synchronous_commit;
// Misc flags.
int gc_cycle_sec;
int query_execution_time_sec;
// set of properties which will be stored on disk
std::vector<std::string> properties_on_disk;
// Distributed master/worker flags.
bool dynamic_graph_partitioner_enabled{false};
int rpc_num_client_workers{0};
int rpc_num_server_workers{0};
int worker_id{0};
io::network::Endpoint master_endpoint{"0.0.0.0", 0};
io::network::Endpoint worker_endpoint{"0.0.0.0", 0};
int recovering_cluster_size{0};
};
class GraphDbAccessor;
/// An abstract base class providing the interface for a graph database.
///
/// Always be sure that GraphDb object is destructed before main exits, i. e.
/// GraphDb object shouldn't be part of global/static variable, except if its
/// destructor is explicitly called before main exits. Consider code:
///
/// GraphDb db; // KeyIndex is created as a part of database::Storage
/// int main() {
/// GraphDbAccessor dba(db);
/// auto v = dba.InsertVertex();
/// v.add_label(dba.Label(
/// "Start")); // New SkipList is created in KeyIndex for LabelIndex.
/// // That SkipList creates SkipListGc which
/// // initialises static Executor object.
/// return 0;
/// }
///
/// After main exits: 1. Executor is destructed, 2. KeyIndex is destructed.
/// Destructor of KeyIndex calls delete on created SkipLists which destroy
/// SkipListGc that tries to use Excutioner object that doesn't exist anymore.
/// -> CRASH
class GraphDb {
public:
GraphDb() {}
GraphDb(const GraphDb &) = delete;
GraphDb(GraphDb &&) = delete;
GraphDb &operator=(const GraphDb &) = delete;
GraphDb &operator=(GraphDb &&) = delete;
virtual ~GraphDb() {}
/// Create a new accessor by starting a new transaction.
virtual std::unique_ptr<GraphDbAccessor> Access() = 0;
/// Create an accessor for a running transaction.
virtual std::unique_ptr<GraphDbAccessor> Access(tx::TransactionId) = 0;
virtual Storage &storage() = 0;
virtual durability::WriteAheadLog &wal() = 0;
virtual tx::Engine &tx_engine() = 0;
virtual storage::ConcurrentIdMapper<storage::Label> &label_mapper() = 0;
virtual storage::ConcurrentIdMapper<storage::EdgeType>
&edge_type_mapper() = 0;
virtual storage::ConcurrentIdMapper<storage::Property> &property_mapper() = 0;
virtual database::Counters &counters() = 0;
virtual void CollectGarbage() = 0;
/// Makes a snapshot from the visibility of the given accessor
virtual bool MakeSnapshot(GraphDbAccessor &accessor) = 0;
/// Releases the storage object safely and creates a new object.
/// This is needed because of recovery, otherwise we might try to recover into
/// a storage which has already been polluted because of a failed previous
/// recovery
virtual void ReinitializeStorage() = 0;
/// When this is false, no new transactions should be created.
bool is_accepting_transactions() const { return is_accepting_transactions_; }
protected:
std::atomic<bool> is_accepting_transactions_{true};
};
namespace impl {
class SingleNode;
} // namespace impl
class SingleNode final : public GraphDb {
public:
explicit SingleNode(Config config = Config());
~SingleNode();
std::unique_ptr<GraphDbAccessor> Access() override;
std::unique_ptr<GraphDbAccessor> Access(tx::TransactionId) override;
Storage &storage() override;
durability::WriteAheadLog &wal() override;
tx::Engine &tx_engine() override;
storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;
storage::ConcurrentIdMapper<storage::Property> &property_mapper() override;
database::Counters &counters() override;
void CollectGarbage() override;
bool MakeSnapshot(GraphDbAccessor &accessor) override;
void ReinitializeStorage() override;
private:
std::unique_ptr<impl::SingleNode> impl_;
std::unique_ptr<utils::Scheduler> snapshot_creator_;
utils::Scheduler transaction_killer_;
};
class SingleNodeRecoveryTransanctions final
: public durability::RecoveryTransactions {
public:
explicit SingleNodeRecoveryTransanctions(SingleNode *db);
~SingleNodeRecoveryTransanctions();
void Begin(const tx::TransactionId &tx_id) override;
void Abort(const tx::TransactionId &tx_id) override;
void Commit(const tx::TransactionId &tx_id) override;
void Apply(const database::StateDelta &delta) override;
private:
SingleNode *db_;
std::unordered_map<tx::TransactionId, std::unique_ptr<GraphDbAccessor>>
accessors_;
};
} // namespace database

View File

@ -0,0 +1,478 @@
#include "database/single_node/graph_db_accessor.hpp"
#include <chrono>
#include <thread>
#include <glog/logging.h>
#include "durability/single_node/state_delta.hpp"
#include "storage/single_node/address_types.hpp"
#include "storage/single_node/edge.hpp"
#include "storage/single_node/edge_accessor.hpp"
#include "storage/single_node/vertex.hpp"
#include "storage/single_node/vertex_accessor.hpp"
#include "utils/cast.hpp"
#include "utils/on_scope_exit.hpp"
namespace database {
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
: db_(db),
transaction_(*db.tx_engine().Begin()),
transaction_starter_{true} {}
GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id)
: db_(db),
transaction_(*db.tx_engine().RunningTransaction(tx_id)),
transaction_starter_{false} {}
GraphDbAccessor::~GraphDbAccessor() {
if (transaction_starter_ && !commited_ && !aborted_) {
this->Abort();
}
}
tx::TransactionId GraphDbAccessor::transaction_id() const {
return transaction_.id_;
}
void GraphDbAccessor::AdvanceCommand() {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
db_.tx_engine().Advance(transaction_.id_);
}
void GraphDbAccessor::Commit() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
db_.tx_engine().Commit(transaction_);
commited_ = true;
}
void GraphDbAccessor::Abort() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
db_.tx_engine().Abort(transaction_);
aborted_ = true;
}
bool GraphDbAccessor::should_abort() const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return transaction_.should_abort();
}
durability::WriteAheadLog &GraphDbAccessor::wal() { return db_.wal(); }
VertexAccessor GraphDbAccessor::InsertVertex(
std::experimental::optional<gid::Gid> requested_gid,
std::experimental::optional<int64_t> cypher_id) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
auto gid = db_.storage().vertex_generator_.Next(requested_gid);
if (!cypher_id) cypher_id = utils::MemcpyCast<int64_t>(gid);
auto vertex_vlist =
new mvcc::VersionList<Vertex>(transaction_, gid, *cypher_id);
bool success =
db_.storage().vertices_.access().insert(gid, vertex_vlist).second;
CHECK(success) << "Attempting to insert a vertex with an existing GID: "
<< gid;
wal().Emplace(database::StateDelta::CreateVertex(
transaction_.id_, vertex_vlist->gid_, vertex_vlist->cypher_id()));
auto va = VertexAccessor(storage::VertexAddress(vertex_vlist), *this);
return va;
}
std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertexOptional(
gid::Gid gid, bool current_state) {
VertexAccessor record_accessor(
storage::VertexAddress(db_.storage().LocalAddress<Vertex>(gid)), *this);
if (!record_accessor.Visible(transaction(), current_state))
return std::experimental::nullopt;
return record_accessor;
}
VertexAccessor GraphDbAccessor::FindVertex(gid::Gid gid, bool current_state) {
auto found = FindVertexOptional(gid, current_state);
CHECK(found) << "Unable to find vertex for id: " << gid;
return *found;
}
std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdgeOptional(
gid::Gid gid, bool current_state) {
EdgeAccessor record_accessor(
storage::EdgeAddress(db_.storage().LocalAddress<Edge>(gid)), *this);
if (!record_accessor.Visible(transaction(), current_state))
return std::experimental::nullopt;
return record_accessor;
}
EdgeAccessor GraphDbAccessor::FindEdge(gid::Gid gid, bool current_state) {
auto found = FindEdgeOptional(gid, current_state);
CHECK(found) << "Unable to find edge for id: " << gid;
return *found;
}
void GraphDbAccessor::BuildIndex(storage::Label label,
storage::Property property) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
db_.storage().index_build_tx_in_progress_.access().insert(transaction_.id_);
// on function exit remove the create index transaction from
// build_tx_in_progress
utils::OnScopeExit on_exit_1([this] {
auto removed = db_.storage().index_build_tx_in_progress_.access().remove(
transaction_.id_);
DCHECK(removed) << "Index creation transaction should be inside set";
});
// Create the index
const LabelPropertyIndex::Key key(label, property);
if (db_.storage().label_property_index_.CreateIndex(key) == false) {
throw IndexExistsException(
"Index is either being created by another transaction or already "
"exists.");
}
// Call the hook for inherited classes.
PostCreateIndex(key);
// Everything that happens after the line above ended will be added to the
// index automatically, but we still have to add to index everything that
// happened earlier. We have to first wait for every transaction that
// happend before, or a bit later than CreateIndex to end.
{
auto wait_transactions = transaction_.engine_.GlobalActiveTransactions();
auto active_index_creation_transactions =
db_.storage().index_build_tx_in_progress_.access();
for (auto id : wait_transactions) {
if (active_index_creation_transactions.contains(id)) continue;
while (transaction_.engine_.Info(id).is_active()) {
// Active index creation set could only now start containing that id,
// since that thread could have not written to the set set and to avoid
// dead-lock we need to make sure we keep track of that
if (active_index_creation_transactions.contains(id)) continue;
// TODO reconsider this constant, currently rule-of-thumb chosen
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
}
// This accessor's transaction surely sees everything that happened before
// CreateIndex.
auto dba = db_.Access();
// Add transaction to the build_tx_in_progress as this transaction doesn't
// change data and shouldn't block other parallel index creations
auto read_transaction_id = dba->transaction().id_;
db_.storage().index_build_tx_in_progress_.access().insert(
read_transaction_id);
// on function exit remove the read transaction from build_tx_in_progress
utils::OnScopeExit on_exit_2([read_transaction_id, this] {
auto removed = db_.storage().index_build_tx_in_progress_.access().remove(
read_transaction_id);
DCHECK(removed) << "Index building (read) transaction should be inside set";
});
dba->PopulateIndexFromBuildIndex(key);
dba->EnableIndex(key);
dba->Commit();
}
void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) {
// Commit transaction as we finished applying method on newest visible
// records. Write that transaction's ID to the WAL as the index has been
// built at this point even if this DBA's transaction aborts for some
// reason.
auto wal_build_index_tx_id = transaction_id();
wal().Emplace(database::StateDelta::BuildIndex(
wal_build_index_tx_id, key.label_, LabelName(key.label_), key.property_,
PropertyName(key.property_)));
// After these two operations we are certain that everything is contained in
// the index under the assumption that the original index creation transaction
// contained no vertex/edge insert/update before this method was invoked.
db_.storage().label_property_index_.IndexFinishedBuilding(key);
}
void GraphDbAccessor::PopulateIndex(const LabelPropertyIndex::Key &key) {
for (auto vertex : Vertices(key.label_, false)) {
if (vertex.PropsAt(key.property_).type() == PropertyValue::Type::Null)
continue;
db_.storage().label_property_index_.UpdateOnLabelProperty(
vertex.address().local(), vertex.current_);
}
}
void GraphDbAccessor::UpdateLabelIndices(storage::Label label,
const VertexAccessor &vertex_accessor,
const Vertex *const vertex) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(vertex_accessor.is_local()) << "Only local vertices belong in indexes";
auto *vlist_ptr = vertex_accessor.address().local();
db_.storage().labels_index_.Update(label, vlist_ptr, vertex);
db_.storage().label_property_index_.UpdateOnLabel(label, vlist_ptr, vertex);
}
void GraphDbAccessor::UpdatePropertyIndex(
storage::Property property, const RecordAccessor<Vertex> &vertex_accessor,
const Vertex *const vertex) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(vertex_accessor.is_local()) << "Only local vertices belong in indexes";
db_.storage().label_property_index_.UpdateOnProperty(
property, vertex_accessor.address().local(), vertex);
}
int64_t GraphDbAccessor::VerticesCount() const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.storage().vertices_.access().size();
}
int64_t GraphDbAccessor::VerticesCount(storage::Label label) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.storage().labels_index_.Count(label);
}
int64_t GraphDbAccessor::VerticesCount(storage::Label label,
storage::Property property) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
const LabelPropertyIndex::Key key(label, property);
DCHECK(db_.storage().label_property_index_.IndexExists(key))
<< "Index doesn't exist.";
return db_.storage().label_property_index_.Count(key);
}
int64_t GraphDbAccessor::VerticesCount(storage::Label label,
storage::Property property,
const PropertyValue &value) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
const LabelPropertyIndex::Key key(label, property);
DCHECK(db_.storage().label_property_index_.IndexExists(key))
<< "Index doesn't exist.";
return db_.storage()
.label_property_index_.PositionAndCount(key, value)
.second;
}
int64_t GraphDbAccessor::VerticesCount(
storage::Label label, storage::Property property,
const std::experimental::optional<utils::Bound<PropertyValue>> lower,
const std::experimental::optional<utils::Bound<PropertyValue>> upper)
const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
const LabelPropertyIndex::Key key(label, property);
DCHECK(db_.storage().label_property_index_.IndexExists(key))
<< "Index doesn't exist.";
CHECK(lower || upper) << "At least one bound must be provided";
CHECK(!lower || lower.value().value().type() != PropertyValue::Type::Null)
<< "Null value is not a valid index bound";
CHECK(!upper || upper.value().value().type() != PropertyValue::Type::Null)
<< "Null value is not a valid index bound";
if (!upper) {
auto lower_pac = db_.storage().label_property_index_.PositionAndCount(
key, lower.value().value());
int64_t size = db_.storage().label_property_index_.Count(key);
return std::max(0l,
size - lower_pac.first -
(lower.value().IsInclusive() ? 0l : lower_pac.second));
} else if (!lower) {
auto upper_pac = db_.storage().label_property_index_.PositionAndCount(
key, upper.value().value());
return upper.value().IsInclusive() ? upper_pac.first + upper_pac.second
: upper_pac.first;
} else {
auto lower_pac = db_.storage().label_property_index_.PositionAndCount(
key, lower.value().value());
auto upper_pac = db_.storage().label_property_index_.PositionAndCount(
key, upper.value().value());
auto result = upper_pac.first - lower_pac.first;
if (lower.value().IsExclusive()) result -= lower_pac.second;
if (upper.value().IsInclusive()) result += upper_pac.second;
return std::max(0l, result);
}
}
bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor,
bool check_empty) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
vertex_accessor.SwitchNew();
// it's possible the vertex was removed already in this transaction
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
if (vertex_accessor.current().is_expired_by(transaction_)) return true;
if (check_empty &&
vertex_accessor.out_degree() + vertex_accessor.in_degree() > 0)
return false;
auto *vlist_ptr = vertex_accessor.address().local();
wal().Emplace(database::StateDelta::RemoveVertex(
transaction_.id_, vlist_ptr->gid_, check_empty));
vlist_ptr->remove(vertex_accessor.current_, transaction_);
return true;
}
void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
vertex_accessor.SwitchNew();
// Note that when we call RemoveEdge we must take care not to delete from the
// collection we are iterating over. This invalidates the iterator in a subtle
// way that does not fail in tests, but is NOT correct.
for (auto edge_accessor : vertex_accessor.in())
RemoveEdge(edge_accessor, true, false);
vertex_accessor.SwitchNew();
for (auto edge_accessor : vertex_accessor.out())
RemoveEdge(edge_accessor, false, true);
RemoveVertex(vertex_accessor, false);
}
EdgeAccessor GraphDbAccessor::InsertEdge(
VertexAccessor &from, VertexAccessor &to, storage::EdgeType edge_type,
std::experimental::optional<gid::Gid> requested_gid,
std::experimental::optional<int64_t> cypher_id) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
auto edge_address =
InsertEdgeOnFrom(&from, &to, edge_type, requested_gid, cypher_id);
InsertEdgeOnTo(&from, &to, edge_type, edge_address);
return EdgeAccessor(edge_address, *this, from.address(), to.address(),
edge_type);
}
storage::EdgeAddress GraphDbAccessor::InsertEdgeOnFrom(
VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const std::experimental::optional<gid::Gid> &requested_gid,
const std::experimental::optional<int64_t> &cypher_id) {
auto edge_accessor = InsertOnlyEdge(from->address(), to->address(), edge_type,
requested_gid, cypher_id);
auto edge_address = edge_accessor.address();
from->SwitchNew();
auto from_updated = &from->update();
// TODO when preparing WAL for distributed, most likely never use
// `CREATE_EDGE`, but always have it split into 3 parts (edge insertion,
// in/out modification).
wal().Emplace(database::StateDelta::CreateEdge(
transaction_.id_, edge_accessor.gid(), edge_accessor.CypherId(),
from->gid(), to->gid(), edge_type, EdgeTypeName(edge_type)));
from_updated->out_.emplace(
db_.storage().LocalizedAddressIfPossible(to->address()), edge_address,
edge_type);
return edge_address;
}
void GraphDbAccessor::InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const storage::EdgeAddress &edge_address) {
// Ensure that the "to" accessor has the latest version (switch new).
// WARNING: Must do that after the above "from->update()" for cases when
// we are creating a cycle and "from" and "to" are the same vlist.
to->SwitchNew();
auto *to_updated = &to->update();
to_updated->in_.emplace(
db_.storage().LocalizedAddressIfPossible(from->address()), edge_address,
edge_type);
}
EdgeAccessor GraphDbAccessor::InsertOnlyEdge(
storage::VertexAddress from, storage::VertexAddress to,
storage::EdgeType edge_type,
std::experimental::optional<gid::Gid> requested_gid,
std::experimental::optional<int64_t> cypher_id) {
CHECK(from.is_local())
<< "`from` address should be local when calling InsertOnlyEdge";
auto gid = db_.storage().edge_generator_.Next(requested_gid);
if (!cypher_id) cypher_id = utils::MemcpyCast<int64_t>(gid);
auto edge_vlist = new mvcc::VersionList<Edge>(transaction_, gid, *cypher_id,
from, to, edge_type);
// We need to insert edge_vlist to edges_ before calling update since update
// can throw and edge_vlist will not be garbage collected if it is not in
// edges_ skiplist.
bool success = db_.storage().edges_.access().insert(gid, edge_vlist).second;
CHECK(success) << "Attempting to insert an edge with an existing GID: "
<< gid;
auto ea = EdgeAccessor(storage::EdgeAddress(edge_vlist), *this, from, to,
edge_type);
return ea;
}
int64_t GraphDbAccessor::EdgesCount() const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.storage().edges_.access().size();
}
void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge,
bool remove_in_edge) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// it's possible the edge was removed already in this transaction
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
edge.SwitchNew();
if (edge.current().is_expired_by(transaction_)) return;
if (remove_out_edge) edge.from().RemoveOutEdge(edge.address());
if (remove_in_edge) edge.to().RemoveInEdge(edge.address());
edge.address().local()->remove(edge.current_, transaction_);
wal().Emplace(database::StateDelta::RemoveEdge(transaction_.id_, edge.gid()));
}
storage::Label GraphDbAccessor::Label(const std::string &label_name) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.label_mapper().value_to_id(label_name);
}
const std::string &GraphDbAccessor::LabelName(storage::Label label) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.label_mapper().id_to_value(label);
}
storage::EdgeType GraphDbAccessor::EdgeType(const std::string &edge_type_name) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.edge_type_mapper().value_to_id(edge_type_name);
}
const std::string &GraphDbAccessor::EdgeTypeName(
storage::EdgeType edge_type) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.edge_type_mapper().id_to_value(edge_type);
}
storage::Property GraphDbAccessor::Property(const std::string &property_name) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.property_mapper().value_to_id(property_name);
}
const std::string &GraphDbAccessor::PropertyName(
storage::Property property) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.property_mapper().id_to_value(property);
}
int64_t GraphDbAccessor::Counter(const std::string &name) {
return db_.counters().Get(name);
}
void GraphDbAccessor::CounterSet(const std::string &name, int64_t value) {
db_.counters().Set(name, value);
}
std::vector<std::string> GraphDbAccessor::IndexInfo() const {
std::vector<std::string> info;
for (storage::Label label : db_.storage().labels_index_.Keys()) {
info.emplace_back(":" + LabelName(label));
}
for (LabelPropertyIndex::Key key :
db_.storage().label_property_index_.Keys()) {
info.emplace_back(fmt::format(":{}({})", LabelName(key.label_),
PropertyName(key.property_)));
}
return info;
}
} // namespace database

View File

@ -0,0 +1,672 @@
/// @file
#pragma once
#include <experimental/optional>
#include <string>
#include <vector>
#include <glog/logging.h>
#include <cppitertools/filter.hpp>
#include <cppitertools/imap.hpp>
#include "database/single_node/graph_db.hpp"
#include "storage/common/types.hpp"
#include "storage/single_node/address_types.hpp"
#include "storage/single_node/edge_accessor.hpp"
#include "storage/single_node/vertex_accessor.hpp"
#include "transactions/transaction.hpp"
#include "transactions/type.hpp"
#include "utils/bound.hpp"
#include "utils/exceptions.hpp"
namespace database {
/** Thrown when creating an index which already exists. */
class IndexExistsException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/** Thrown when creating an index which already exists. */
class IndexCreationOnWorkerException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/**
* Base accessor for the database object: exposes functions for operating on the
* database. All the functions in this class should be self-sufficient: for
* example the function for creating a new Vertex should take care of all the
* book-keeping around the creation.
*/
class GraphDbAccessor {
// We need to make friends with this guys since they need to access private
// methods for updating indices.
// TODO: Rethink this, we have too much long-distance friendship complicating
// the code.
friend class ::RecordAccessor<Vertex>;
friend class ::VertexAccessor;
protected:
// Construction should only be done through GraphDb::Access function and
// concrete GraphDbAccessor type.
/// Creates a new accessor by starting a new transaction.
explicit GraphDbAccessor(GraphDb &db);
/// Creates an accessor for a running transaction.
GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id);
public:
virtual ~GraphDbAccessor();
GraphDbAccessor(const GraphDbAccessor &other) = delete;
GraphDbAccessor(GraphDbAccessor &&other) = delete;
GraphDbAccessor &operator=(const GraphDbAccessor &other) = delete;
GraphDbAccessor &operator=(GraphDbAccessor &&other) = delete;
virtual ::VertexAccessor::Impl *GetVertexImpl() = 0;
virtual ::RecordAccessor<Edge>::Impl *GetEdgeImpl() = 0;
/**
* Creates a new Vertex and returns an accessor to it. If the ID is
* provided, the created Vertex will have that local ID, and the ID counter
* will be increased to it so collisions are avoided. This should only be used
* by durability recovery, normal vertex creation should not provide the ID.
*
* You should NOT make interleaved recovery and normal DB op calls to this
* function. Doing so will likely mess up the ID generation and crash MG.
* Always perform recovery only once, immediately when the database is
* created, before any transactional ops start.
*
* @param requested_gid The requested GID. Should only be provided when
* recovering from durability.
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
*
* @return See above.
*/
VertexAccessor InsertVertex(std::experimental::optional<gid::Gid>
requested_gid = std::experimental::nullopt,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/**
* Removes the vertex of the given accessor. If the vertex has any outgoing or
* incoming edges, it is not deleted. See `DetachRemoveVertex` if you want to
* remove a vertex regardless of connectivity.
*
* If the vertex has already been deleted by the current transaction+command,
* this function will not do anything and will return true.
*
* @param vertex_accessor Accessor to vertex.
* @param check_empty If the vertex should be checked for existing edges
* before deletion.
* @return If or not the vertex was deleted.
*/
virtual bool RemoveVertex(VertexAccessor &vertex_accessor,
bool check_empty = true);
/**
* Removes the vertex of the given accessor along with all it's outgoing
* and incoming connections.
*
* @param vertex_accessor Accessor to a vertex.
*/
void DetachRemoveVertex(VertexAccessor &vertex_accessor);
/**
* Obtains the vertex for the given ID. If there is no vertex for the given
* ID, or it's not visible to this accessor's transaction, nullopt is
* returned.
*
* @param gid - The GID of the sought vertex.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
std::experimental::optional<VertexAccessor> FindVertexOptional(
gid::Gid gid, bool current_state);
/**
* Obtains the vertex for the given ID. If there is no vertex for the given
* ID, or it's not visible to this accessor's transaction, MG is crashed
* using a CHECK.
*
* @param gid - The GID of the sought vertex.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
VertexAccessor FindVertex(gid::Gid gid, bool current_state);
/**
* Returns iterable over accessors to all the vertices in the graph
* visible to the current transaction.
*
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
auto Vertices(bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// wrap version lists into accessors, which will look for visible versions
auto accessors = iter::imap(
[this](auto id_vlist) {
return VertexAccessor(storage::VertexAddress(id_vlist.second), *this);
},
db_.storage().vertices_.access());
// filter out the accessors not visible to the current transaction
return iter::filter(
[this, current_state](const VertexAccessor &accessor) {
return accessor.Visible(transaction(), current_state);
},
std::move(accessors));
}
/**
* Return VertexAccessors which contain the current label for the current
* transaction visibilty.
* @param label - label for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection
*/
auto Vertices(storage::Label label, bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().labels_index_.GetVlists(label, transaction_,
current_state));
}
/**
* Return VertexAccessors which contain the current label and property for the
* given transaction visibility.
*
* @param label - label for which to return VertexAccessors
* @param property - property for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection
*/
auto Vertices(storage::Label label, storage::Property property,
bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property)))
<< "Label+property index doesn't exist.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), transaction_,
current_state));
}
/**
* Return VertexAccessors which contain the current label + property, and
* those properties are equal to this 'value' for the given transaction
* visibility.
* @param label - label for which to return VertexAccessors
* @param property - property for which to return VertexAccessors
* @param value - property value for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection
*/
auto Vertices(storage::Label label, storage::Property property,
const PropertyValue &value, bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property)))
<< "Label+property index doesn't exist.";
CHECK(value.type() != PropertyValue::Type::Null)
<< "Can't query index for propery value type null.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), value, transaction_,
current_state));
}
/**
* Return an iterable over VertexAccessors which contain the
* given label and whose property value (for the given property)
* falls within the given (lower, upper) @c Bound.
*
* The returned iterator will only contain
* vertices/edges whose property value is comparable with the
* given bounds (w.r.t. type). This has implications on Cypher
* query execuction semantics which have not been resovled yet.
*
* At least one of the bounds must be specified. Bonds can't be
* @c PropertyValue::Null. If both bounds are
* specified, their PropertyValue elments must be of comparable
* types.
*
* @param label - label for which to return VertexAccessors
* @param property - property for which to return VertexAccessors
* @param lower - Lower bound of the interval.
* @param upper - Upper bound of the interval.
* @param value - property value for which to return VertexAccessors
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
* @return iterable collection of record accessors
* satisfy the bounds and are visible to the current transaction.
*/
auto Vertices(
storage::Label label, storage::Property property,
const std::experimental::optional<utils::Bound<PropertyValue>> lower,
const std::experimental::optional<utils::Bound<PropertyValue>> upper,
bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property)))
<< "Label+property index doesn't exist.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(storage::VertexAddress(vlist), *this);
},
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), lower, upper,
transaction_, current_state));
}
/**
* Creates a new Edge and returns an accessor to it. If the ID is
* provided, the created Edge will have that ID, and the ID counter will be
* increased to it so collisions are avoided. This should only be used by
* durability recovery, normal edge creation should not provide the ID.
*
* You should NOT make interleaved recovery and normal DB op calls to this
* function. Doing so will likely mess up the ID generation and crash MG.
* Always perform recovery only once, immediately when the database is
* created, before any transactional ops start.
*
* @param from The 'from' vertex.
* @param to The 'to' vertex'
* @param type Edge type.
* @param requested_gid The requested GID. Should only be provided when
* recovering from durability.
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
*
* @return An accessor to the edge.
*/
EdgeAccessor InsertEdge(VertexAccessor &from, VertexAccessor &to,
storage::EdgeType type,
std::experimental::optional<gid::Gid> requested_gid =
std::experimental::nullopt,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/**
* Insert edge into main storage, but don't insert it into from and to
* vertices edge lists.
*
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
*/
EdgeAccessor InsertOnlyEdge(storage::VertexAddress from,
storage::VertexAddress to,
storage::EdgeType edge_type,
std::experimental::optional<gid::Gid>
requested_gid = std::experimental::nullopt,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
/**
* Removes an edge from the graph. Parameters can indicate if the edge should
* be removed from data structures in vertices it connects. When removing an
* edge both arguments should be `true`. `false` is only used when
* detach-deleting a vertex.
*
* @param edge The accessor to an edge.
* @param remove_out_edge If the edge should be removed from the its origin
* side.
* @param remove_in_edge If the edge should be removed from the its
* destination side.
*/
virtual void RemoveEdge(EdgeAccessor &edge, bool remove_out_edge = true,
bool remove_in_edge = true);
/**
* Obtains the edge for the given ID. If there is no edge for the given
* ID, or it's not visible to this accessor's transaction, nullopt is
* returned.
*
* @param gid - The GID of the sought edge.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
std::experimental::optional<EdgeAccessor> FindEdgeOptional(
gid::Gid gid, bool current_state);
/**
* Obtains the edge for the given ID. If there is no edge for the given
* ID, or it's not visible to this accessor's transaction, MG is crashed
* using a CHECK.
*
* @param gid - The GID of the sought edge.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
EdgeAccessor FindEdge(gid::Gid gid, bool current_state);
/**
* Returns iterable over accessors to all the edges in the graph
* visible to the current transaction.
*
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
auto Edges(bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// wrap version lists into accessors, which will look for visible versions
auto accessors = iter::imap(
[this](auto id_vlist) {
return EdgeAccessor(storage::EdgeAddress(id_vlist.second), *this);
},
db_.storage().edges_.access());
// filter out the accessors not visible to the current transaction
return iter::filter(
[this, current_state](const EdgeAccessor &accessor) {
return accessor.Visible(transaction(), current_state);
},
std::move(accessors));
}
/**
* Creates and returns a new accessor that represents the same graph element
* (node / version) as the given `accessor`, but in this `GraphDbAccessor`.
*
* It is possible that the given `accessor` graph element is not visible in
* this `GraphDbAccessor`'s transaction. If that is the case, a `nullopt` is
* returned.
*
* The returned accessor does NOT have the same `current_` set as the given
* `accessor`. It has default post-construction `current_` set (`old` if
* available, otherwise `new`).
*
* @param accessor The [Vertex/Edge]Accessor whose underlying graph element we
* want in this GraphDbAccessor.
* @return See above.
* @tparam TAccessor Either VertexAccessor or EdgeAccessor
*/
template <typename TAccessor>
std::experimental::optional<TAccessor> Transfer(const TAccessor &accessor) {
if (accessor.db_accessor_ == this)
return std::experimental::make_optional(accessor);
TAccessor accessor_in_this(accessor.address(), *this);
if (accessor_in_this.current_)
return std::experimental::make_optional(std::move(accessor_in_this));
else
return std::experimental::nullopt;
}
/**
* Adds an index for the given (label, property) and populates it with
* existing vertices that belong to it.
*
* You should never call BuildIndex on a GraphDbAccessor (transaction) on
* which new vertices have been inserted or existing ones updated. Do it
* in a new accessor instead.
*
* Build index throws if an index for the given (label, property) already
* exists (even if it's being built by a concurrent transaction and is not yet
* ready for use).
*
* It also throws if there is another index being built concurrently on the
* same database this accessor is for.
*
* @param label - label to build for
* @param property - property to build for
*/
virtual void BuildIndex(storage::Label label, storage::Property property);
/// Populates index with vertices containing the key
void PopulateIndex(const LabelPropertyIndex::Key &key);
/// Writes Index (key) creation to wal, marks it as ready for usage
void EnableIndex(const LabelPropertyIndex::Key &key);
/**
* @brief - Returns true if the given label+property index already exists and
* is ready for use.
*/
bool LabelPropertyIndexExists(storage::Label label,
storage::Property property) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.storage().label_property_index_.IndexExists(
LabelPropertyIndex::Key(label, property));
}
/**
* @brief - Returns vector of keys of label-property indices.
*/
std::vector<LabelPropertyIndex::Key> GetIndicesKeys() {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.storage().label_property_index_.Keys();
}
/**
* Return approximate number of all vertices in the database.
* Note that this is always an over-estimate and never an under-estimate.
*/
int64_t VerticesCount() const;
/*
* Return approximate number of all edges in the database.
* Note that this is always an over-estimate and never an under-estimate.
*/
int64_t EdgesCount() const;
/**
* Return approximate number of vertices under indexes with the given label.
* Note that this is always an over-estimate and never an under-estimate.
*
* @param label - label to check for
* @return number of vertices with the given label
*/
int64_t VerticesCount(storage::Label label) const;
/**
* Return approximate number of vertices under indexes with the given label
* and property. Note that this is always an over-estimate and never an
* under-estimate.
*
* @param label - label to check for
* @param property - property to check for
* @return number of vertices with the given label, fails if no such
* label+property index exists.
*/
int64_t VerticesCount(storage::Label label, storage::Property property) const;
/**
* Returns approximate number of vertices that have the given label
* and the given value for the given property.
*
* Assumes that an index for that (label, property) exists.
*/
int64_t VerticesCount(storage::Label label, storage::Property property,
const PropertyValue &value) const;
/**
* Returns approximate number of vertices that have the given label
* and whose vaue is in the range defined by upper and lower @c Bound.
*
* At least one bound must be specified. Neither can be
* PropertyValue::Null.
*
* Assumes that an index for that (label, property) exists.
*/
int64_t VerticesCount(
storage::Label label, storage::Property property,
const std::experimental::optional<utils::Bound<PropertyValue>> lower,
const std::experimental::optional<utils::Bound<PropertyValue>> upper)
const;
/**
* Obtains the Label for the label's name.
* @return See above.
*/
storage::Label Label(const std::string &label_name);
/**
* Obtains the label name (a string) for the given label.
*
* @param label a Label.
* @return See above.
*/
const std::string &LabelName(storage::Label label) const;
/**
* Obtains the EdgeType for it's name.
* @return See above.
*/
storage::EdgeType EdgeType(const std::string &edge_type_name);
/**
* Obtains the edge type name (a string) for the given edge type.
*
* @param edge_type an EdgeType.
* @return See above.
*/
const std::string &EdgeTypeName(storage::EdgeType edge_type) const;
/**
* Obtains the Property for it's name.
* @return See above.
*/
storage::Property Property(const std::string &property_name);
/**
* Obtains the property name (a string) for the given property.
*
* @param property a Property.
* @return See above.
*/
const std::string &PropertyName(storage::Property property) const;
/** Returns the id of this accessor's transaction */
tx::TransactionId transaction_id() const;
/** Advances transaction's command id by 1. */
virtual void AdvanceCommand();
/** Commit transaction. */
void Commit();
/** Abort transaction. */
void Abort();
/** Return true if transaction is hinted to abort. */
bool should_abort() const;
const tx::Transaction &transaction() const { return transaction_; }
durability::WriteAheadLog &wal();
auto &db() { return db_; }
const auto &db() const { return db_; }
/**
* Returns the current value of the counter with the given name, and
* increments that counter. If the counter with the given name does not exist,
* a new counter is created and this function returns 0.
*/
int64_t Counter(const std::string &name);
/**
* Sets the counter with the given name to the given value. Returns nothing.
* If the counter with the given name does not exist, a new counter is created
* and set to the given value.
*/
void CounterSet(const std::string &name, int64_t value);
/* Returns a list of index names present in the database. */
std::vector<std::string> IndexInfo() const;
/**
* Insert this vertex into corresponding label and label+property (if it
* exists) index.
*
* @param label - label with which to insert vertex label record
* @param vertex_accessor - vertex_accessor to insert
* @param vertex - vertex record to insert
*/
void UpdateLabelIndices(storage::Label label,
const VertexAccessor &vertex_accessor,
const Vertex *const vertex);
protected:
/** Called in `BuildIndex` after creating an index, but before populating. */
virtual void PostCreateIndex(const LabelPropertyIndex::Key &key) {}
/** Populates the index from a *new* transaction after creating the index. */
virtual void PopulateIndexFromBuildIndex(const LabelPropertyIndex::Key &key) {
PopulateIndex(key);
}
/**
* Insert a new edge to `from` vertex and return the address.
* Called from `InsertEdge` as the first step in edge insertion.
* */
virtual storage::EdgeAddress InsertEdgeOnFrom(
VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const std::experimental::optional<gid::Gid> &requested_gid,
const std::experimental::optional<int64_t> &cypher_id);
/**
* Set the newly created edge on `to` vertex.
* Called after `InsertEdgeOnFrom` in `InsertEdge`. The given `edge_address`
* is from the created edge, returned by `InsertEdgeOnFrom`.
*/
virtual void InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const storage::EdgeAddress &edge_address);
private:
GraphDb &db_;
tx::Transaction &transaction_;
// Indicates if this db-accessor started the transaction and should Abort it
// upon destruction.
bool transaction_starter_;
bool commited_{false};
bool aborted_{false};
/**
* Insert this vertex into corresponding any label + 'property' index.
* @param property - vertex will be inserted into indexes which contain this
* property
* @param vertex_accessor - vertex accessor to insert
* @param vertex - vertex to insert
*/
void UpdatePropertyIndex(storage::Property property,
const RecordAccessor<Vertex> &vertex_accessor,
const Vertex *const vertex);
};
} // namespace database

View File

@ -1,6 +1,6 @@
#include "bfs_rpc_clients.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "distributed/bfs_rpc_messages.hpp"
#include "distributed/data_manager.hpp"

View File

@ -2,7 +2,7 @@
#include <unordered_map>
#include "database/distributed_graph_db.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "query/exceptions.hpp"
#include "query/plan/operator.hpp"

View File

@ -7,7 +7,7 @@
#include "glog/logging.h"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "query/context.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/interpret/eval.hpp"

View File

@ -3,7 +3,7 @@
#pragma once
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "distributed/cache.hpp"
#include "distributed/data_rpc_clients.hpp"
#include "transactions/type.hpp"

View File

@ -2,8 +2,8 @@
#include <memory>
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/data_rpc_messages.hpp"
namespace distributed {

View File

@ -1,7 +1,7 @@
#pragma once
#include "communication/rpc/server.hpp"
#include "database/graph_db.hpp"
#include "database/distributed/graph_db.hpp"
namespace database {
class DistributedGraphDb;

View File

@ -4,8 +4,8 @@
#include <unordered_map>
#include <vector>
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "query/exceptions.hpp"
#include "distributed/dgp/vertex_migrator.hpp"

View File

@ -1,7 +1,7 @@
#include "distributed/dgp/vertex_migrator.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "query/typed_value.hpp"
namespace distributed::dgp {

View File

@ -1,7 +1,7 @@
#include "distributed/durability_rpc_worker.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/durability_rpc_messages.hpp"
namespace distributed {

View File

@ -1,6 +1,6 @@
#include "distributed/dynamic_worker.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "distributed/dynamic_worker_rpc_messages.hpp"
namespace distributed {

View File

@ -1,8 +1,8 @@
#include "distributed/index_rpc_server.hpp"
#include "communication/rpc/server.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/index_rpc_messages.hpp"
namespace distributed {

View File

@ -1,6 +1,6 @@
#include "distributed/produce_rpc_server.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "distributed/data_manager.hpp"
#include "distributed/pull_produce_rpc_messages.hpp"
#include "query/common.hpp"

View File

@ -8,8 +8,8 @@
#include <vector>
#include "communication/rpc/server.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/pull_produce_rpc_messages.hpp"
#include "query/context.hpp"

View File

@ -18,8 +18,8 @@ cpp<#
(lcp:in-impl
#>cpp
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/data_manager.hpp"
cpp<#)

View File

@ -2,7 +2,7 @@
#include <vector>
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/coordination.hpp"
#include "distributed/pull_produce_rpc_messages.hpp"
#include "query/context.hpp"

View File

@ -4,7 +4,7 @@
#include <unordered_map>
#include "communication/rpc/messages.hpp"
#include "database/serialization.hpp"
#include "database/distributed/serialization.hpp"
#include "distributed/updates_rpc_messages.capnp.h"
#include "durability/distributed/state_delta.hpp"
#include "storage/distributed/address_types.hpp"
@ -18,7 +18,7 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'db "/database/serialization.capnp")
(lcp:capnp-import 'db "/database/distributed/serialization.capnp")
(lcp:capnp-import 'storage "/storage/distributed/serialization.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")

View File

@ -9,8 +9,8 @@
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/updates_rpc_messages.hpp"
#include "durability/distributed/state_delta.hpp"
#include "query/typed_value.hpp"

View File

@ -4,7 +4,7 @@
#include <limits>
#include <unordered_map>
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "durability/distributed/snapshot_decoder.hpp"
#include "durability/distributed/snapshot_value.hpp"
#include "durability/distributed/version.hpp"

View File

@ -4,7 +4,7 @@
#include <glog/logging.h>
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "durability/distributed/snapshot_encoder.hpp"
#include "durability/distributed/version.hpp"
#include "durability/hashed_file_writer.hpp"

View File

@ -2,7 +2,7 @@
#include <experimental/filesystem>
#include "database/graph_db.hpp"
#include "database/distributed/graph_db.hpp"
namespace durability {

View File

@ -1,7 +1,7 @@
#pragma once
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "glue/communication.hpp"
#include "utils/cast.hpp"

View File

@ -3,7 +3,7 @@
#include <string>
#include "communication/bolt/v1/value.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "glue/communication.hpp"
namespace database {

View File

@ -4,7 +4,7 @@
#include <limits>
#include <unordered_map>
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/paths.hpp"
#include "durability/single_node/snapshot_decoder.hpp"

View File

@ -4,7 +4,7 @@
#include <glog/logging.h>
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/paths.hpp"
#include "durability/single_node/snapshot_encoder.hpp"

View File

@ -2,7 +2,7 @@
#include <experimental/filesystem>
#include "database/graph_db.hpp"
#include "database/single_node/graph_db.hpp"
namespace durability {

View File

@ -1,7 +1,7 @@
#pragma once
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "glue/communication.hpp"
#include "utils/cast.hpp"

View File

@ -3,7 +3,7 @@
#include <string>
#include "communication/bolt/v1/value.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "glue/communication.hpp"
namespace database {

View File

@ -10,7 +10,7 @@
#include <glog/logging.h>
#include "communication/server.hpp"
#include "database/graph_db.hpp"
#include "database/single_node/graph_db.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "integrations/kafka/streams.hpp"
#include "memgraph_init.hpp"

View File

@ -10,8 +10,7 @@
#include <glog/logging.h>
#include "communication/server.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "integrations/kafka/streams.hpp"
#include "memgraph_init.hpp"

View File

@ -1,6 +1,6 @@
#include "query/distributed_interpreter.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "query/plan/distributed.hpp"
#include "query/plan/planner.hpp"

View File

@ -1,6 +1,6 @@
#include "query/plan/distributed_ops.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "distributed/pull_produce_rpc_messages.hpp"
#include "distributed/pull_rpc_clients.hpp"

View File

@ -1,6 +1,6 @@
#include "storage/edge_accessor.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "storage/vertex_accessor.hpp"
#include "utils/algorithm.hpp"

View File

@ -2,7 +2,7 @@
#include <glog/logging.h>
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "durability/distributed/state_delta.hpp"
#include "storage/distributed/edge.hpp"
#include "storage/distributed/vertex.hpp"

View File

@ -1,6 +1,6 @@
#include "storage/distributed/serialization.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/data_manager.hpp"
namespace storage {

View File

@ -2,7 +2,7 @@
#include <algorithm>
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "durability/distributed/state_delta.hpp"
#include "utils/algorithm.hpp"

View File

@ -1,6 +1,6 @@
#include "storage/edge_accessor.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "storage/vertex_accessor.hpp"
#include "utils/algorithm.hpp"

View File

@ -2,7 +2,7 @@
#include <glog/logging.h>
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "durability/single_node/state_delta.hpp"
#include "storage/single_node/edge.hpp"
#include "storage/single_node/vertex.hpp"

View File

@ -2,7 +2,7 @@
#include <algorithm>
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "durability/single_node/state_delta.hpp"
#include "utils/algorithm.hpp"

View File

@ -8,7 +8,7 @@
#include <vector>
#include "data_structures/concurrent/skiplist.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "mvcc/single_node/version_list.hpp"
#include "storage/common/property_value.hpp"
#include "storage/common/types.hpp"

View File

@ -3,15 +3,15 @@
#include <glog/logging.h>
#include "communication/result_stream_faker.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "query/interpreter.hpp"
#include "query/typed_value.hpp"
class ExpansionBenchFixture : public benchmark::Fixture {
protected:
// GraphDb shouldn't be global constructed/destructed. See
// documentation in database/graph_db.hpp for details.
// documentation in database/single_node/graph_db.hpp for details.
std::experimental::optional<database::SingleNode> db_;
query::Interpreter interpreter_;

View File

@ -2,8 +2,8 @@
#include <benchmark/benchmark_api.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "query/frontend/semantic/symbol_generator.hpp"
#include "query/plan/cost_estimator.hpp"
#include "query/plan/planner.hpp"

View File

@ -11,7 +11,7 @@
#include <gtest/gtest.h>
#include "communication/server.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
static constexpr const int SIZE = 60000;
static constexpr const int REPLY = 10;

View File

@ -13,7 +13,7 @@
#include <gtest/gtest.h>
#include "communication/server.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
static constexpr const char interface[] = "127.0.0.1";

View File

@ -12,7 +12,7 @@
#include <glog/logging.h>
#include "json/json.hpp"
#include "database/graph_db.hpp"
#include "database/single_node/graph_db.hpp"
#include "integrations/kafka/streams.hpp"
#include "memgraph_init.hpp"
#include "utils/flag_validation.hpp"

View File

@ -8,8 +8,8 @@
#include <gtest/gtest.h>
#include "communication/result_stream_faker.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "glue/communication.hpp"
#include "query/distributed_interpreter.hpp"
#include "query/typed_value.hpp"

View File

@ -8,7 +8,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "database/distributed_graph_db.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "query/distributed_interpreter.hpp"
#include "query/repl.hpp"
#include "utils/file.hpp"

View File

@ -10,8 +10,8 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "query/context.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/ast/cypher_main_visitor.hpp"

View File

@ -5,7 +5,7 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "database/graph_db.hpp"
#include "database/single_node/graph_db.hpp"
#include "query/repl.hpp"
#include "query/interpreter.hpp"
#include "utils/random_graph_generator.hpp"

View File

@ -1,6 +1,6 @@
#include "communication/result_stream_faker.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "query/interpreter.hpp"
int main(int argc, char *argv[]) {

View File

@ -8,8 +8,8 @@
#include <rapidcheck.h>
#include <rapidcheck/gtest.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "storage/vertex_accessor.hpp"
/**

View File

@ -2,8 +2,8 @@
#include "bolt_testdata.hpp"
#include "communication/bolt/v1/encoder/encoder.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "glue/communication.hpp"
using communication::bolt::Value;

View File

@ -2,7 +2,7 @@
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "database/distributed_counters.hpp"
#include "database/distributed/distributed_counters.hpp"
const std::string kLocal = "127.0.0.1";

View File

@ -1,8 +1,8 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "storage/common/types.hpp"
#include "storage/single_node/vertex.hpp"
#include "transactions/single_node/engine_single_node.hpp"

View File

@ -1,7 +1,7 @@
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "storage/common/types.hpp"
#include "storage/single_node/indexes/label_property_index.hpp"
#include "transactions/single_node/engine_single_node.hpp"

View File

@ -1,7 +1,7 @@
#include "gtest/gtest.h"
#include "config.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/distributed/distributed_graph_db.hpp"
TEST(DatabaseMaster, Instantiate) {
database::Config config;

View File

@ -5,8 +5,8 @@
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/data_manager.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "storage/distributed/address_types.hpp"

View File

@ -2,7 +2,7 @@
#include "distributed_common.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "durability/distributed/snapshooter.hpp"
#include "durability/distributed/version.hpp"
#include "durability/paths.hpp"

View File

@ -3,8 +3,7 @@
#include "gtest/gtest.h"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "distributed_common.hpp"
#include "io/network/endpoint.hpp"
#include "query_plan_common.hpp"

View File

@ -4,7 +4,7 @@
#include "gtest/gtest.h"
#include "database/graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "distributed/coordination.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp"

View File

@ -5,7 +5,7 @@
#include "gtest/gtest.h"
#include "communication/result_stream_faker.hpp"
#include "database/graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "distributed/pull_rpc_clients.hpp"

View File

@ -4,7 +4,7 @@
#include "gtest/gtest.h"
#include "database/graph_db.hpp"
#include "database/distributed/graph_db.hpp"
#include "distributed/coordination.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp"

View File

@ -3,7 +3,7 @@
#include <gtest/gtest.h>
#include "database/graph_db_accessor.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "query/typed_value.hpp"

View File

@ -12,9 +12,9 @@
#include <gtest/gtest.h>
// TODO: FIXME
// #include "database/distributed_graph_db.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
// #include "database/distributed/distributed_graph_db.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/paths.hpp"
#include "durability/single_node/recovery.hpp"

View File

@ -2,8 +2,8 @@
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "storage/common/types.hpp"
#include "storage/single_node/indexes/label_property_index.hpp"

View File

@ -2,8 +2,8 @@
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "storage/common/types.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/vertex_accessor.hpp"

View File

@ -5,8 +5,8 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "utils/bound.hpp"
using testing::UnorderedElementsAreArray;

View File

@ -1,7 +1,7 @@
#include <cstdlib>
#include "communication/result_stream_faker.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "query/exceptions.hpp"

View File

@ -1,8 +1,8 @@
#include <gtest/gtest.h>
#include <memory>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/plan/cost_estimator.hpp"

View File

@ -7,7 +7,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "query/context.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/opencypher/parser.hpp"

View File

@ -7,7 +7,7 @@
#include "gtest/gtest.h"
#include "communication/result_stream_faker.hpp"
#include "database/graph_db.hpp"
#include "database/single_node/graph_db.hpp"
#include "query/context.hpp"
#include "query/exceptions.hpp"
#include "query/plan/operator.hpp"

View File

@ -8,8 +8,8 @@
#include "gtest/gtest.h"
#include "communication/result_stream_faker.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "query/interpreter.hpp"
DECLARE_bool(query_cost_planner);

View File

@ -14,7 +14,7 @@
#include <gtest/gtest.h>
#include "communication/result_stream_faker.hpp"
#include "database/graph_db.hpp"
#include "database/single_node/graph_db.hpp"
#include "query/context.hpp"
#include "query/exceptions.hpp"
#include "query/plan/operator.hpp"

View File

@ -3,8 +3,8 @@
#include "gtest/gtest.h"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/semantic/symbol_generator.hpp"
#include "query/frontend/semantic/symbol_table.hpp"

View File

@ -3,8 +3,8 @@
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "mvcc/single_node/version_list.hpp"
#include "storage/common/property_value.hpp"
#include "storage/single_node/address.hpp"

View File

@ -1,7 +1,7 @@
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "durability/single_node/state_delta.hpp"
TEST(StateDelta, CreateVertex) {

View File

@ -9,7 +9,7 @@
#include "gtest/gtest.h"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "query/typed_value.hpp"
using query::TypedValue;

View File

@ -4,8 +4,8 @@
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/single_node/graph_db.hpp"
#include "database/single_node/graph_db_accessor.hpp"
#include "durability/single_node/recovery.hpp"
#include "query/typed_value.hpp"