Typed-value migration in progress. UNSTABLE STATE

This commit is contained in:
Florijan Stamenkovic 2017-02-06 12:40:55 +01:00
parent b374ae1dbb
commit b38704391c
25 changed files with 234 additions and 675 deletions

View File

@ -280,7 +280,7 @@ add_library(cypher_lib STATIC ${CMAKE_BINARY_DIR}/cypher.cpp)
set(memgraph_src_files
${src_dir}/config/config.cpp
${src_dir}/dbms/dbms.cpp
${src_dir}/dbms/cleaner.cpp
# ${src_dir}/dbms/cleaner.cpp
${src_dir}/utils/string/transform.cpp
${src_dir}/utils/string/join.cpp
${src_dir}/utils/string/file.cpp
@ -297,17 +297,17 @@ set(memgraph_src_files
${src_dir}/communication/bolt/v1/serialization/bolt_serializer.cpp
${src_dir}/threading/thread.cpp
${src_dir}/mvcc/id.cpp
${src_dir}/snapshot/snapshot_engine.cpp
${src_dir}/snapshot/snapshoter.cpp
${src_dir}/snapshot/snapshot_encoder.cpp
${src_dir}/snapshot/snapshot_decoder.cpp
${src_dir}/storage/vertices.cpp
${src_dir}/storage/edges.cpp
# ${src_dir}/snapshot/snapshot_engine.cpp
# ${src_dir}/snapshot/snapshoter.cpp
# ${src_dir}/snapshot/snapshot_encoder.cpp
# ${src_dir}/snapshot/snapshot_decoder.cpp
${src_dir}/storage/typed_value.cpp
${src_dir}/storage/typed_value_store.cpp
${src_dir}/storage/locking/record_lock.cpp
${src_dir}/storage/garbage/garbage.cpp
# ${src_dir}/storage/garbage/garbage.cpp
${src_dir}/storage/vertex_accessor.cpp
${src_dir}/storage/edge_accessor.cpp
# ${src_dir}/storage/record_accessor.cpp
${src_dir}/transactions/snapshot.cpp
${src_dir}/transactions/transaction.cpp
${src_dir}/transactions/transaction_read.cpp
@ -319,11 +319,8 @@ set(memgraph_src_files
${src_dir}/logging/default.cpp
${src_dir}/logging/log.cpp
${src_dir}/io/network/tls.cpp
${src_dir}/database/db.cpp
${src_dir}/database/db_transaction.cpp
${src_dir}/database/db_accessor.cpp
${src_dir}/storage/edge_accessor.cpp
${src_dir}/storage/record_accessor.cpp
${src_dir}/database/graph_db.cpp
${src_dir}/database/graph_db_accessor.cpp
)
# -----------------------------------------------------------------------------

View File

@ -2,21 +2,15 @@
# COPY header files required by query engine (query compiler)
# TODO: create a copy function, COPY_RELATIVE(base relative_path dst)
# TODO: somehow automate (in destination dir should be only required include files)
FILE(COPY ${include_dir}/database/db.hpp DESTINATION ${build_include_dir}/database)
FILE(COPY ${include_dir}/database/db_transaction.hpp DESTINATION ${build_include_dir}/database)
FILE(COPY ${include_dir}/database/db_accessor.hpp DESTINATION ${build_include_dir}/database)
FILE(COPY ${include_dir}/database/graph_db.hpp DESTINATION ${build_include_dir}/database)
FILE(COPY ${include_dir}/storage/graph.hpp DESTINATION ${build_include_dir}/storage)
FILE(COPY ${include_dir}/storage/edge.hpp DESTINATION ${build_include_dir}/storage)
FILE(COPY ${include_dir}/storage/edge_record.hpp DESTINATION ${build_include_dir}/storage)
FILE(COPY ${include_dir}/storage/vertex_record.hpp DESTINATION ${build_include_dir}/storage)
FILE(COPY ${include_dir}/storage/edge_accessor.hpp DESTINATION ${build_include_dir}/storage)
FILE(COPY ${include_dir}/storage/vertex.hpp DESTINATION ${build_include_dir}/storage)
FILE(COPY ${include_dir}/storage/vertex_accessor.hpp DESTINATION ${build_include_dir}/storage)
FILE(COPY ${include_dir}/storage/record_accessor.hpp DESTINATION ${build_include_dir}/storage)
FILE(COPY ${include_dir}/storage/locking/record_lock.hpp DESTINATION ${build_include_dir}/storage/locking)
FILE(COPY ${include_dir}/storage/locking/lock_status.hpp DESTINATION ${build_include_dir}/storage/locking)
FILE(COPY ${include_dir}/storage/edge_x_vertex.hpp DESTINATION ${build_include_dir}/storage)
FILE(COPY ${include_dir}/query/util.hpp DESTINATION ${build_include_dir}/query)
FILE(COPY ${include_dir}/query/i_plan_cpu.hpp DESTINATION ${build_include_dir}/query)

View File

@ -42,8 +42,11 @@ public:
encoder.write_struct_header(3);
encoder.write(underlying_cast(pack::Node));
// write the identifier for the node
encoder.write_integer(vertex.id());
// IMPORTANT: here we write a hardcorded 0 because we don't
// use internal IDs, but need to give something to Bolt
// note that OpenCypther has no id(x) function, so the client
// should not be able to do anything with this value anyway
encoder.write_integer(0);
// write the list of labels
auto labels = vertex.labels();
@ -67,9 +70,9 @@ public:
/** Serializes the vertex accessor into the packstream format
*
* struct[size = 5] Edge [signature = 0x52] {
* Integer edge_id;
* Integer start_node_id;
* Integer end_node_id;
* Integer edge_id; // IMPORTANT: always 0 since we don't do IDs
* Integer start_node_id; // IMPORTANT: always 0 since we don't do IDs
* Integer end_node_id; // IMPORTANT: always 0 since we don't do IDs
* String type;
* Map<String, Value> properties;
* }

View File

@ -1,162 +0,0 @@
#pragma once
#include "database/graph_db.hpp"
#include "database/db_transaction.hpp"
#include "utils/border.hpp"
#include "utils/iterator/iterator.hpp"
#include "utils/option.hpp"
#include "storage/model/typed_value_store.hpp"
namespace tx {
class Transaction;
}
class Label;
class EdgeType;
using EdgePropertyFamily = PropertyFamily<TypeGroupEdge>;
using VertexPropertyFamily = PropertyFamily<TypeGroupVertex>;
/*
* DbAccessor
* -Guarantees that access to Vertex and Edge is possible only through
* VertexAccessor and EdgeAccessor.
* -Guarantees that changing Vertex and Edge is possible only using
* VertexAccessor returned by vertex_insert() method and
* EdgeAccessor returned by edge_insert() method.
* -Offers CRUD for Vertex and Edge except iterating over all edges.
*
* VertexAccessor
* By default Vertex::accessor is empty. Caller has to call fill() method
* to fetch valid data and check it's return value. fill() method returns
* true if there is valid data for current transaction false otherwise.
* Only exception to this rule is vertex_insert() method in DbAccessor
* which returns by default filled VertexAccessor.
*
* EdgeAccessor
* By default Edge::accessor is empty. Caller has to call fill() method
* to
* fetch valid data and check it's return value. fill() method returns
* true
* if there is valid data for current transaction false otherwise.
* Only exception to this rule is edge_insert() method in DbAccessor
* which
* returns by default filled EdgeAccessor.
*/
class DbAccessor {
public:
DbAccessor(Db &db);
DbAccessor(Db &db, tx::Transaction &t);
DbAccessor(const DbAccessor &other) = delete;
DbAccessor(DbAccessor &&other) = delete;
//*******************VERTEX METHODS
// Returns iterator of VertexAccessor for all vertices.
// TODO: Implement class specaily for this return
// NOTE: This implementation must be here to be able to infere return type.
auto vertex_access() {
return iter::make_map(
iter::make_iter(this->db_transaction.db.graph.vertices.access()),
[&](auto e) -> auto {
return VertexAccessor(&(e->second), db_transaction);
});
}
// Optionaly return vertex with given internal Id.
Option<const VertexAccessor> vertex_find(const Id &id);
// Creates new Vertex and returns filled VertexAccessor.
VertexAccessor vertex_insert();
// ******************* EDGE METHODS
// Returns iterator of EdgeAccessor for all edges.
// TODO: Implement class specaily for this return
// NOTE: This implementation must be here to be able to infere return type.
auto edge_access() {
return iter::make_map(
iter::make_iter(this->db_transaction.db.graph.edges.access()),
[&](auto e) -> auto {
return EdgeAccessor(&(e->second), db_transaction);
});
}
// Optionally return Edge with given internal Id.
Option<const EdgeAccessor> edge_find(const Id &id);
// Creates new Edge and returns filled EdgeAccessor.
// Slighlty faster than const version.
EdgeAccessor edge_insert(VertexAccessor &from, VertexAccessor &to);
// Creates new Edge and returns filled EdgeAccessor.
EdgeAccessor edge_insert(VertexAccessor const &from,
VertexAccessor const &to);
// ******************* LABEL METHODS
// Finds or crated label with given name.
const Label &label_find_or_create(const char *name);
// True if label with name exists.
bool label_contains(const char *name);
// ******************** TYPE METHODS
// Finds or creates edge_type with given name.
const EdgeType &type_find_or_create(const char *name);
// True if edge_type with given name exists.
bool type_contains(const char *name);
// ******************** PROPERTY METHODS
VertexPropertyFamily &vertex_property_family_get(const std::string &name);
EdgePropertyFamily &edge_property_family_get(const std::string &name);
// ******************** PROPERTY HELPER METHODS
VertexPropertyFamily::PropertyType::PropertyFamilyKey
vertex_property_key(const std::string &name, Type type);
EdgePropertyFamily::PropertyType::PropertyFamilyKey
edge_property_key(const std::string &name, Type type);
template<class T>
VertexPropertyFamily::PropertyType::PropertyTypeKey <T>
vertex_property_key(const std::string &name) {
return vertex_property_family_get(name)
.get(T::type)
.template type_key<T>();
}
template<class T>
EdgePropertyFamily::PropertyType::PropertyTypeKey <T>
edge_property_key(const std::string &name) {
return edge_property_family_get(name)
.get(T::type)
.template type_key<T>();
}
bool update_indexes();
// ******************** TRANSACTION METHODS
// True if commit was successful, or false if transaction was aborted.
bool commit();
// Aborts transaction.
void abort();
private:
// TODO: make this friend generic for all indexes.
template<class T, class K>
friend
class NonUniqueUnorderedIndex;
template<class T, class K>
friend
class UniqueOrderedIndex;
DbTransaction db_transaction;
};

View File

@ -1,67 +0,0 @@
#pragma once
#include "logging/loggable.hpp"
#include "storage/indexes/index_update.hpp"
#include "transactions/transaction.hpp"
class Db;
class DbAccessor;
class SnapshotEncoder;
using index_updates_t = std::vector<IndexUpdate>;
// Inner structures local to transaction can hold ref to this structure and use
// its methods.
// Also serves as a barrier for calling methods defined public but meant for
// internal use. That kind of method should request DbTransaction&.
class DbTransaction : public Loggable
{
friend DbAccessor;
public:
DbTransaction() = delete;
~DbTransaction() = default;
DbTransaction(const DbTransaction& other) = delete;
DbTransaction(DbTransaction&& other) = default;
DbTransaction &operator=(const DbTransaction &) = delete;
DbTransaction &operator=(DbTransaction &&) = default;
DbTransaction(Db &db);
DbTransaction(Db &db, tx::Transaction &trans)
: Loggable("DbTransaction"), db(db), trans(trans) {}
// Global transactional algorithms,operations and general methods meant for
// internal use should be here or should be routed through this object.
// This should provide cleaner hierarchy of operations on database.
// For example cleaner.
// Cleans edge part of database. MUST be called by one cleaner thread at
// one time.
// TODO: Should be exctracted to separate class which can enforce one thread
// at atime.
void clean_edge_section();
// Cleans vertex part of database. MUST be called by one cleaner thread at
// one time..
// TODO: Should be exctracted to separate class which can enforce one thread
// at atime.
void clean_vertex_section();
// Updates indexes of Vertex/Edges in index_updates. True if indexes are
// updated successfully. False means that transaction failed.
// TODO: Should be moved to Indexes class where it will this DbTransaction
// as an argument.
bool update_indexes();
// Will update indexes for given element TG::record_t. Actual update happens
// with call update_indexes
template <class TG>
void to_update_index(typename TG::vlist_t *vlist,
typename TG::record_t *record);
index_updates_t index_updates;
Db &db;
tx::Transaction &trans;
};

View File

@ -1,12 +1,10 @@
#pragma once
#include "data_structures/concurrent/skiplist.hpp"
#include "database/db_transaction.hpp"
#include "snapshot/snapshot_engine.hpp"
#include "storage/garbage/garbage.hpp"
#include "transactions/engine.hpp"
#include "mvcc/version_list.hpp"
#include "utils/pass_key.hpp"
#include "storage/unique_object_store.hpp"
// forward declaring Edge and Vertex because they use
// GraphDb::Label etc., and therefore include this header
@ -24,8 +22,8 @@ class EdgeAccessor;
* Main class which represents Database concept in code.
*/
class GraphDb {
public:
using sptr = std::shared_ptr<GraphDb>;
// definitions for what data types are used for a Label, Property, EdgeType
using Label = uint32_t;
@ -66,31 +64,9 @@ public:
*/
GraphDb(const GraphDb &db) = delete;
/**
* Creates a new Vertex and returns an accessor to it.
*
* @param db_trans The transaction that is creating a vertex.
* @return See above.
*/
VertexAccesor insert_vertex(DbTransaction& db_trans);
/**
* Creates a new Edge and returns an accessor to it.
*
* @param db_trans The transaction that is creating an Edge.
* @param from The 'from' vertex.
* @param to The 'to' vertex'
* @param type Edge type.
* @return An accessor to the edge.
*/
EdgeAccessor insert_edge(DbTransaction& db_trans, VertexAccessor& from,
VertexAccessor& to, EdgeType type);
/** transaction engine related to this database */
tx::Engine tx_engine;
/** garbage collector related to this database*/
// TODO bring back garbage collection
// Garbage garbage = {tx_engine};
@ -99,14 +75,15 @@ public:
// SnapshotEngine snap_engine = {*this};
// database name
// TODO consider if this is even necessary
const std::string name_;
private:
// main storage for the graph
SkipList<mvcc::VersionList<Edge>*> edges_;
SkipList<mvcc::VersionList<Vertex>*> vertices_;
// utility stuff
const PassKey<GraphDb> pass_key;
// unique object stores
UniqueObjectStore<std::string, Label> labels_;
UniqueObjectStore<std::string, EdgeType> edge_types_;
UniqueObjectStore<std::string, Property> properties_;
};

View File

@ -10,15 +10,50 @@
class GraphDbAccessor {
GraphDbAccessor(GraphDb& db) : db_(db), transaction_(db.tx_engine.begin()) {}
public:
tx::Transaction transaction_;
/**
* Creates a new Vertex and returns an accessor to it.
*
* @return See above.
*/
VertexAccessor insert_vertex();
/**
* Creates a new Edge and returns an accessor to it.
*
* @param from The 'from' vertex.
* @param to The 'to' vertex'
* @param type Edge type.
* @return An accessor to the edge.
*/
EdgeAccessor insert_edge(VertexAccessor& from, VertexAccessor& to, GraphDb::EdgeType type);
/**
* Obtains the Label for the label's name.
* @return See above.
*/
GraphDb::Label label(const std::string& label_name);
/**
* Obtains the EdgeType for it's name.
* @return See above.
*/
GraphDb::EdgeType edge_type(const std::string& edge_type_name);
/**
* Obtains the Property for it's name.
* @return See above.
*/
GraphDb::Property property(const std::string& property_name);
/** The current transaction */
tx::Transaction const transaction_;
private:
GraphDb& db_;
// for privileged access to some RecordAccessor functionality (and similar)
const PassKey<GraphDb> pass_key;
};

View File

@ -11,13 +11,13 @@ class Cleaning
public:
// How much sec is a cleaning_cycle in which cleaner will clean at most
// once. Starts cleaner thread.
Cleaning(ConcurrentMap<std::string, Db> &dbs, size_t cleaning_cycle);
Cleaning(ConcurrentMap<std::string, GraphDb> &dbs, size_t cleaning_cycle);
// Destroys this object after this thread joins cleaning thread.
~Cleaning();
private:
ConcurrentMap<std::string, Db> &dbms;
ConcurrentMap<std::string, GraphDb> &dbms;
const size_t cleaning_cycle;

View File

@ -3,8 +3,8 @@
#include "config/config.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/graph_db.hpp"
#include "dbms/cleaner.hpp"
#include "snapshot/snapshoter.hpp"
//#include "dbms/cleaner.hpp"
//#include "snapshot/snapshoter.hpp"
class Dbms
{
@ -12,27 +12,29 @@ public:
Dbms() { create_default(); }
// returns active database
Db &active();
GraphDb &active();
// set active database
// if active database doesn't exist creates one
Db &active(const std::string &name);
GraphDb &active(const std::string &name);
// TODO: DELETE action
private:
// creates default database
Db &create_default() { return active("default"); }
GraphDb &create_default() { return active("default"); }
// dbs container
ConcurrentMap<std::string, Db> dbs;
ConcurrentMap<std::string, GraphDb> dbs;
// currently active database
std::atomic<Db *> active_db;
std::atomic<GraphDb *> active_db;
// Cleaning thread.
Cleaning cleaning = {dbs, CONFIG_INTEGER(config::CLEANING_CYCLE_SEC)};
// Snapshoting thread.
Snapshoter snapshoter = {dbs, CONFIG_INTEGER(config::SNAPSHOT_CYCLE_SEC)};
// // Cleaning thread.
// TODO re-enable cleaning
// Cleaning cleaning = {dbs, CONFIG_INTEGER(config::CLEANING_CYCLE_SEC)};
//
// // Snapshoting thread.
// TODO re-enable cleaning
// Snapshoter snapshoter = {dbs, CONFIG_INTEGER(config::SNAPSHOT_CYCLE_SEC)};
};

View File

@ -25,7 +25,7 @@ namespace mvcc {
* Note: use only at the beginning of the "other's" lifecycle since this
* constructor doesn't move the RecordLock, but only the head pointer
*/
VersionList(VersionList &&other) : id(other.id) {
VersionList(VersionList &&other) {
this->head = other.head.load();
other.head = nullptr;
}

View File

@ -14,11 +14,11 @@ public:
void set_edge_type(GraphDb::EdgeType edge_type);
EdgeType edge_type() const;
GraphDb::EdgeType edge_type() const;
VertexAccessor from() const;
VertexAccessor to() const;
void remove() const;
void remove();
};

View File

@ -1,32 +0,0 @@
#pragma once
#include "storage/edges.hpp"
#include "storage/edge_type/edge_type_store.hpp"
#include "storage/label/label_store.hpp"
#include "storage/vertices.hpp"
/**
* Graph storage. Contains vertices and edges, labels and edges.
*/
class Graph
{
public:
/**
* default constructor
*
* At the beginning the graph is empty.
*/
Graph() = default;
/** storage for all vertices related to this graph */
Vertices vertices;
/** storage for all edges related to this graph */
Edges edges;
/** storage for all labels */
LabelStore label_store;
/** storage for all types related for this graph */
EdgeTypeStore edge_type_store;
};

View File

@ -1,10 +1,10 @@
#pragma once
#include "database/db_transaction.hpp"
#include "mvcc/version_list.hpp"
#include "transactions/transaction.hpp"
#include "storage/typed_value.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "utils/pass_key.hpp"
template <typename TRecord, typename TDerived>
@ -12,33 +12,18 @@ class RecordAccessor {
public:
RecordAccessor(mvcc::VersionList<TRecord>* vlist, DbTransaction &db_trans)
: vlist_(vlist), db_trans_(db_trans) {
assert(vlist_ != nullptr);
RecordAccessor(mvcc::VersionList<TRecord>* vlist, tx::Transaction &trans)
: vlist_(vlist), trans_(trans) {
record_ = vlist->find(trans_);
}
RecordAccessor(TRecord *t, mvcc::VersionList<TRecord> *vlist, DbTransaction &db_trans)
: record_(t), vlist_(vlist), db_trans_(db_trans) {
assert(record_ != nullptr);
assert(vlist_ != nullptr);
}
// TODO: Test this
TDerived update() const {
assert(!empty());
if (record_->is_visible_write(db_trans_.trans)) {
// TODO: VALIDATE THIS BRANCH. THEN ONLY THIS TRANSACTION CAN SEE
// THIS DATA WHICH MEANS THAT IT CAN CHANGE IT.
return TDerived(record_, vlist_, db_trans_);
} else {
auto new_record_ = vlist_->update(db_trans_.trans);
// TODO: Validate that update of record in this accessor is correct.
const_cast<RecordAccessor *>(this)->record_ = new_record;
return TDerived(new_record, vlist_, db_trans_);
}
/**
* Indicates if this record is visible to the current transaction.
*
* @return
*/
bool is_visible() const {
return record_ != nullptr;
}
TypedValue at(GraphDb::Property key) const {
@ -47,11 +32,12 @@ public:
template <typename TValue>
void set(GraphDb::Property key, TValue value) {
// TODO should update be called here?!?!
update();
record_->props_.set(key, value);
}
size_t erase(GraphDb::Property key) const {
size_t erase(GraphDb::Property key) {
update();
return record_->props_.erase(key);
}
@ -78,13 +64,29 @@ public:
* @param pass_key Ignored.
* @return The version list of this accessor.
*/
mvcc::VersionList<TRecord>* vlist(Passkey<GraphDb> pass_key) {
mvcc::VersionList<TRecord>* vlist(PassKey<GraphDbAccessor> pass_key) const {
return vlist_;
}
protected:
TRecord* record_{nullptr};
/**
* Ensures this accessor is fit for updating functions.
*
* IMPORTANT: This function should be called from any
* method that will change the record (in terms of the
* property graph data).
*/
void update() {
// TODO consider renaming this to something more indicative
// of the underlying MVCC functionality (like "new_version" or so)
if (record_->is_visible_write(trans_))
return;
else
record_ = vlist_->update(trans_);
}
mvcc::VersionList<TRecord>* vlist_;
DbTransaction& db_trans_;
tx::Transaction& trans_;
TRecord* record_;
};

View File

@ -28,7 +28,24 @@ public:
// TODO add in/out functions that return (collection|iterator) over EdgeAccessor
// returns if remove was possible due to connections
bool remove() const;
bool remove();
void detach_remove();
/**
* Adds the given Edge version list to this Vertex's incoming edges.
*
* @param edge_vlist The Edge to add.
* @param pass_key Ensures only GraphDb has access to this method.
*/
void attach_in(mvcc::VersionList<Edge>* edge_vlist, PassKey<GraphDb> pass_key);
/**
* Adds the given Edge version list to this Vertex's outgoing edges.
*
* @param edge_vlist The Edge to add.
* @param pass_key Ensures only GraphDb has access to this method.
*/
void attach_out(mvcc::VersionList<Edge>* edge_vlist, PassKey<GraphDb> pass_key);
void detach_remove() const;
};

View File

@ -1,108 +0,0 @@
#include "database/graph_db.hpp"
#include "database/db_accessor.hpp"
DbAccessor::DbAccessor(Db &db)
: db_transaction(std::move(DbTransaction(db, db.tx_engine.begin()))) {
}
DbAccessor::DbAccessor(Db &db, tx::Transaction &t)
: db_transaction(std::move(DbTransaction(db, t))) {
}
// VERTEX METHODS
Option<const VertexAccessor> DbAccessor::vertex_find(const Id &id) {
return this->db_transaction.db.graph.vertices.find(db_transaction, id);
}
VertexAccessor DbAccessor::vertex_insert() {
return this->db_transaction.db.graph.vertices.insert(db_transaction);
}
// EDGE METHODS
Option<const EdgeAccessor> DbAccessor::edge_find(const Id &id) {
return db_transaction.db.graph.edges.find(db_transaction, id);
}
EdgeAccessor DbAccessor::edge_insert(VertexAccessor &from, VertexAccessor &to) {
auto edge_accessor = db_transaction.db.graph.edges.insert(
db_transaction, from.vlist, to.vlist);
// Connect edge with from,to vertices.
from->data.out.add(edge_accessor.vlist);
to->data.in.add(edge_accessor.vlist);
return edge_accessor;
}
EdgeAccessor DbAccessor::edge_insert(VertexAccessor const &from,
VertexAccessor const &to) {
auto edge_accessor = db_transaction.db.graph.edges.insert(
db_transaction, from.vlist, to.vlist);
// Connect edge with updated from,to vertices.
from.update()->data.out.add(edge_accessor.vlist);
to.update()->data.in.add(edge_accessor.vlist);
return edge_accessor;
}
// LABEL METHODS
const Label &DbAccessor::label_find_or_create(const char *name) {
return db_transaction.db.graph.label_store.find_or_create(name);
}
bool DbAccessor::label_contains(const char *name) {
return db_transaction.db.graph.label_store.contains(name);
}
// TYPE METHODS
const EdgeType &DbAccessor::type_find_or_create(const char *name) {
return db_transaction.db.graph.edge_type_store.find_or_create(name);
}
bool DbAccessor::type_contains(const char *name) {
return db_transaction.db.graph.edge_type_store.contains(name);
}
// PROPERTY METHODS
VertexPropertyFamily &
DbAccessor::vertex_property_family_get(const std::string &name) {
return db_transaction.db.graph.vertices.property_family_find_or_create(
name);
}
EdgePropertyFamily &
DbAccessor::edge_property_family_get(const std::string &name) {
return db_transaction.db.graph.edges.property_family_find_or_create(name);
}
// PROPERTY HELPER METHODS
VertexPropertyFamily::PropertyType::PropertyFamilyKey
DbAccessor::vertex_property_key(const std::string &name, Type type) {
return vertex_property_family_get(name).get(type).family_key();
}
EdgePropertyFamily::PropertyType::PropertyFamilyKey
DbAccessor::edge_property_key(const std::string &name, Type type) {
return edge_property_family_get(name).get(type).family_key();
}
bool DbAccessor::update_indexes() {
return db_transaction.update_indexes();
}
// TRANSACTION METHODS
bool DbAccessor::commit() {
if (db_transaction.update_indexes()) {
db_transaction.trans.commit();
return true;
} else {
// Index update wasn't successfull so whe are aborting transaction.
db_transaction.trans.abort();
return false;
}
}
void DbAccessor::abort() { db_transaction.trans.abort(); }

View File

@ -1,119 +0,0 @@
#include "database/db_transaction.hpp"
#include "database/graph_db.hpp"
#include "serialization/serialization.hpp"
#include "storage/edge.hpp"
#include "storage/edge_type/edge_type.hpp"
#include "storage/indexes/indexes.hpp"
#include "storage/label/label.hpp"
#include "storage/vertex.hpp"
DbTransaction::DbTransaction(Db &db)
: Loggable("DbTransaction"), db(db), trans(db.tx_engine.begin())
{
}
// Cleaning for version lists
template <class A>
void clean_version_lists(A &&acc, Id oldest_active)
{
for (auto &vlist : acc)
{
if (vlist.second.gc_deleted(oldest_active))
{
// TODO: Optimization, iterator with remove method.
bool succ = acc.remove(vlist.first);
// There is other cleaner here
runtime_assert(succ, "Remove has failed");
}
}
}
// Cleans edge part of database. Should be called by one cleaner thread at
// one time.
void DbTransaction::clean_edge_section()
{
Id oldest_active = trans.oldest_active();
// Clean indexes
db.indexes().edge_indexes([&](auto &in) { in.clean(oldest_active); });
// Clean Edge list
clean_version_lists(db.graph.edges.access(), oldest_active);
}
// Cleans vertex part of database. Should be called by one cleaner thread at
// one time.
void DbTransaction::clean_vertex_section()
{
Id oldest_active = trans.oldest_active();
// Clean indexes
db.indexes().vertex_indexes([&](auto &in) { in.clean(oldest_active); });
// Clean vertex list
clean_version_lists(db.graph.vertices.access(), oldest_active);
}
bool DbTransaction::update_indexes()
{
logger.trace("index_updates: {}, instance: {}, transaction: {}",
index_updates.size(), static_cast<void *>(this), trans.id);
while (!index_updates.empty())
{
auto index_update = index_updates.back();
if (index_update.tag == IndexUpdate::EDGE)
{
auto edge = index_update.e;
// TODO: This could be done in batch
// NOTE: This assumes that type index is created with the database.
if (!edge.record->data.edge_type->index().insert(
EdgeTypeIndexRecord(std::nullptr_t(), edge.record,
edge.vlist)))
return false;
if (!db.indexes().update_property_indexes<TypeGroupEdge>(edge,
trans))
return false;
}
else
{
auto vertex = index_update.v;
for (auto label : vertex.record->data.labels())
{
// TODO: This could be done in batch
// NOTE: This assumes that label index is created with the
// database.
if (!label.get().index().insert(LabelIndexRecord(
std::nullptr_t(), vertex.record, vertex.vlist)))
return false;
}
if (!db.indexes().update_property_indexes<TypeGroupVertex>(vertex,
trans))
return false;
}
index_updates.pop_back();
}
return true;
}
template <class TG>
void DbTransaction::to_update_index(typename TG::vlist_t *vlist,
typename TG::record_t *record)
{
index_updates.emplace_back(make_index_update(vlist, record));
logger.trace("update_index, updates_no: {}, instance: {}, transaction: {}",
index_updates.size(), static_cast<void *>(this), trans.id);
}
template void DbTransaction::to_update_index<TypeGroupVertex>(
TypeGroupVertex::vlist_t *vlist, TypeGroupVertex::record_t *record);
template void
DbTransaction::to_update_index<TypeGroupEdge>(TypeGroupEdge::vlist_t *vlist,
TypeGroupEdge::record_t *record);

View File

@ -19,59 +19,3 @@ GraphDb::GraphDb(const std::string &name, bool import_snapshot)
// if (import_snapshot) snap_engine.import();
//}
VertexAccessor GraphDb::insert_vertex(DbTransaction &db_trans) {
auto vertex_vlist = new mvcc::VersionList<Vertex>();
vertex_vlist->insert(db_trans.trans);
// TODO make this configurable
for (int i = 0; i < 5; ++i) {
bool success = vertices_.access().insert(vertex_vlist).second;
if (success)
return VertexAccessor(vertex_vlist, db_trans);
// TODO sleep for some amount of time
}
throw CreationException("Unable to create a Vertex after 5 attempts");
}
EdgeAccessor GraphDb::insert_edge(
DbTransaction& db_trans, VertexAccessor& from,
VertexAccessor& to, EdgeType type) {
auto edge_vlist = new mvcc::VersionList<Edge>();
Edge* edge = edge_vlist->insert(db_trans.trans);
// set the given values of the new edge
edge->edge_type_ = type;
// connect the edge to vertices
edge->from_ = from.vlist(pass_key);
edge->to_ = to.vlist(pass_key);
// connect the vertices to edge
from.vlist(pass_key).out_.emplace(edge_vlist);
to.vlist(pass_key).in_.emplace(edge_vlist);
// TODO make this configurable
for (int i = 0; i < 5; ++i) {
bool success = edges_.access().insert(edge_vlist).second;
if (success)
return EdgeAccessor(edge_vlist, db_trans);
// TODO sleep for some amount of time
}
throw CreationException("Unable to create an Edge after 5 attempts");
EdgeRecord edge_record(next, from, to);
auto edge = edge_record.insert(t.trans);
// insert the new vertex record into the vertex store
auto edges_accessor = edges.access();
auto result = edges_accessor.insert(next, std::move(edge_record));
// create new vertex
auto inserted_edge_record = result.first;
t.to_update_index<TypeGroupEdge>(&inserted_edge_record->second, edge);
return EdgeAccessor(edge, &inserted_edge_record->second, t);
}
`

View File

@ -0,0 +1,59 @@
#include "database/graph_db_accessor.hpp"
VertexAccessor GraphDbAccessor::insert_vertex() {
auto vertex_vlist = new mvcc::VersionList<Vertex>();
vertex_vlist->insert(transaction_);
// TODO make this configurable
for (int i = 0; i < 5; ++i) {
bool success = db_.vertices_.access().insert(vertex_vlist).second;
if (success)
return VertexAccessor(vertex_vlist, transaction_);
// TODO sleep for some amount of time
}
throw CreationException("Unable to create a Vertex after 5 attempts");
}
EdgeAccessor GraphDbAccessor::insert_edge(
VertexAccessor& from,
VertexAccessor& to,
GraphDb::EdgeType type) {
auto edge_vlist = new mvcc::VersionList<Edge>();
Edge* edge = edge_vlist->insert(transaction_);
// set the given values of the new edge
edge->edge_type_ = type;
// connect the edge to vertices
edge->from_ = from.vlist(pass_key);
edge->to_ = to.vlist(pass_key);
// TODO connect the vertices to edge
from.add_to_out(edge_vlist, pass_key);
to.add_to_in(edge_vlist, pass_key);
// from.vlist(pass_key).out_.emplace(edge_vlist);
// to.vlist(pass_key).in_.emplace(edge_vlist);
// TODO make this configurable
for (int i = 0; i < 5; ++i) {
bool success = db_.edges_.access().insert(edge_vlist).second;
if (success)
return EdgeAccessor(edge_vlist, transaction_);
// TODO sleep for some amount of time
}
throw CreationException("Unable to create an Edge after 5 attempts");
}
GraphDb::Label GraphDbAccessor::label(const std::string& label_name) {
return db_.labels_.GetKey(label_name);
}
GraphDb::EdgeType GraphDbAccessor::edge_type(const std::string& edge_type_name){
return db_.edge_types_.GetKey(edge_type_name);
}
GraphDb::Property GraphDbAccessor::property(const std::string& property_name) {
return db_.properties_.GetKey(property_name);
}

View File

@ -9,7 +9,7 @@
#include "logging/default.hpp"
Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs, size_t cleaning_cycle)
Cleaning::Cleaning(ConcurrentMap<std::string, GraphDb> &dbs, size_t cleaning_cycle)
: dbms(dbs), cleaning_cycle(cleaning_cycle)
{
// Start the cleaning thread

View File

@ -1,9 +1,9 @@
#include "dbms/dbms.hpp"
// returns active database
Db &Dbms::active()
GraphDb &Dbms::active()
{
Db *active = active_db.load(std::memory_order_acquire);
GraphDb *active = active_db.load(std::memory_order_acquire);
if (UNLIKELY(active == nullptr)) {
// There is no active database.
return create_default();
@ -14,7 +14,7 @@ Db &Dbms::active()
// set active database
// if active database doesn't exist create one
Db &Dbms::active(const std::string &name)
GraphDb &Dbms::active(const std::string &name)
{
auto acc = dbs.access();
// create db if it doesn't exist

View File

@ -10,23 +10,29 @@ GraphDb::EdgeType EdgeAccessor::edge_type() const {
}
VertexAccessor EdgeAccessor::from() const {
return VertexAccessor(this->record_->from_, this->db_trans_);
return VertexAccessor(this->record_->from_, this->trans_);
}
VertexAccessor EdgeAccessor::to() const {
return VertexAccessor(this->record_->to_, this->db_trans_);
return VertexAccessor(this->record_->to_, this->trans_);
}
void EdgeAccessor::remove() const {
void EdgeAccessor::remove() {
// remove this edge's reference from the "from" vertex
auto& vertex_from_out = from().update().record_->out_;
std::remove(vertex_from_out.begin(), vertex_from_out.end(), record_->from_);
auto vertex_from = from();
vertex_from.update();
std::remove(vertex_from.record_->out_.begin(),
vertex_from.record_->out_.end(),
vlist_);
// remove this edge's reference from the "to" vertex
auto& vertex_to_in = to().update().record_->in_;
std::remove(vertex_to_in.begin(), vertex_to_in.end(), record_->to_);
auto vertex_to = to();
vertex_to.update();
std::remove(vertex_to.record_->in_.begin(),
vertex_to.record_->in_.end(),
vlist_);
// remove this record from the database via MVCC
vlist_->remove(record_, db_trans_.trans);
vlist_->remove(record_, trans_);
}

View File

@ -10,10 +10,12 @@ size_t VertexAccessor::in_degree() const {
}
bool VertexAccessor::add_label(GraphDb::Label label) {
update();
return this->record_->labels_.emplace(label).second;
}
size_t VertexAccessor::remove_label(GraphDb::Label label) {
update();
return this->record_->labels_.erase(label);
}
@ -26,25 +28,34 @@ const std::set<GraphDb::Label> &VertexAccessor::labels() const {
return this->record_->labels_;
}
bool VertexAccessor::remove() const {
bool VertexAccessor::remove() {
// TODO consider if this works well with MVCC
if (out_degree() > 0 || in_degree() > 0)
return false;
vlist_->remove(record_, db_trans_.trans);
vlist_->remove(record_, trans_);
return true;
}
void VertexAccessor::detach_remove() const {
void VertexAccessor::detach_remove() {
// removing edges via accessors is both safe
// and it should remove all the pointers in the relevant
// vertices (including this one)
for (auto edge_vlist : record_->out_)
EdgeAccessor(edge_vlist, db_trans_).remove();
EdgeAccessor(edge_vlist, trans_).remove();
for (auto edge_vlist : record_->in_)
EdgeAccessor(edge_vlist, db_trans_).remove();
EdgeAccessor(edge_vlist, trans_).remove();
vlist_->remove(record_, db_trans_.trans);
vlist_->remove(record_, trans_);
}
void VertexAccessor::attach_in(mvcc::VersionList<Edge>* edge_vlist, PassKey<GraphDb>) {
update();
this->record_->in_.emplace_back(edge_vlist);
}
void VertexAccessor::attach_out(mvcc::VersionList<Edge>* edge_vlist, PassKey<GraphDb>) {
update();
this->record_->out_.emplace_back(edge_vlist);
}

View File

@ -5,7 +5,7 @@
#include "gtest/gtest.h"
#include "utils/total_ordering.hpp"
#include "storage/unique_object_store.h"
#include "storage/unique_object_store.hpp"
/**
* Wraps an int and implements total ordering. Used for testing the